美文网首页Golang 开发者
OpenFalcon源码分析(Agent组件)

OpenFalcon源码分析(Agent组件)

作者: Xiao_Yang | 来源:发表于2018-09-17 08:47 被阅读425次

一 基本说明

Agent VERSION "5.1.2"

  • Agent源代码文件目录说明
文件目录说明
  • 调用关系


    调用关系
  • 组件交互关系


    组件交互关系

二 源码分析

Main 入口函数

    # 命令行参数解析
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    check := flag.Bool("check", false, "check collector")

    flag.Parse()
    
    # 命令行版本查看
    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }
    
    # 命令行收集器检测
    if *check {
        funcs.CheckCollector()      #【参考相关详细分析】
        os.Exit(0)
    }

    # Agent配置文件解析,g.Config()可获取完整配置
    g.ParseConfig(*cfg)   #【参考相关详细分析】

    if g.Config().Debug {
        g.InitLog("debug")
    } else {
        g.InitLog("info")
    }
    
    # 全局初始化工作(工作目录/本地IP/RPC_Client)
    g.InitRootDir()             #【参考相关详细分析】
    g.InitLocalIp()             #【参考相关详细分析】
    g.InitRpcClients()          #【参考相关详细分析】

    # 采集功能与模块构造映射Map
    funcs.BuildMappers()        #【参考相关详细分析】
    
    # 
    go cron.InitDataHistory()     #【参考相关详细分析】
    
    # 上报Agent自身状态
    cron.ReportAgentStatus()      #【参考相关详细分析】
    # 同步模块
    cron.SyncMinePlugins()        #【参考相关详细分析】
    cron.SyncBuiltinMetrics()     #【参考相关详细分析】
    cron.SyncTrustableIps()       #【参考相关详细分析】
    # 采集数据
    cron.Collect()                #【参考相关详细分析】

    # API服务器运行
    go http.Start()               #【参考相关详细分析】

    # 阻塞主进程
    select {}

funcs.CheckCollector() 各采集模块功能状态检测

func CheckCollector() {

    output := make(map[string]bool)

   #  引用外部"github.com/toolkits/nux"功能获取
   #  CurrentProcStat()
   #  ListDiskStats()
   #  ListeningPorts()
   #  AllProcs()
   
    _, procStatErr := nux.CurrentProcStat()
    _, listDiskErr := nux.ListDiskStats()
    ports, listeningPortsErr := nux.ListeningPorts()
    procs, psErr := nux.AllProcs()

    # 引用外部"github.com/toolkits/sys"功能执行模块Du系统命令    
    _, duErr := sys.CmdOut("du", "--help")
    
    # 各采集功能模块采集调用与结果判断(各模块采集代码实现可进一步分析)
    output["kernel  "] = len(KernelMetrics()) > 0
    output["df.bytes"] = DeviceMetricsCheck()
    output["net.if  "] = len(CoreNetMetrics([]string{})) > 0
    output["loadavg "] = len(LoadAvgMetrics()) > 0
    output["cpustat "] = procStatErr == nil
    output["disk.io "] = listDiskErr == nil
    output["memory  "] = len(MemMetrics()) > 0
    output["netstat "] = len(NetstatMetrics()) > 0
    output["ss -s   "] = len(SocketStatSummaryMetrics()) > 0
    output["ss -tln "] = listeningPortsErr == nil && len(ports) > 0
    output["ps aux  "] = psErr == nil && len(procs) > 0
    output["du -bs  "] = duErr == nil
   
    # 将MAP output的各值进行结果输出,True为OK,False为fail打印显示
    for k, v := range output {
        status := "fail"
        if v {
            status = "ok"
        }
        fmt.Println(k, "...", status)
    }
}

g.ParseConfig(*cfg) Agent配置文件解析

#通过命令指定配置文件,默认为同级目录下cfg.json
cfg := flag.String("c", "cfg.json", "configuration file")

# ParseConfig()解析cfg配置文件并保存在私有变量config内,可通过
# Config()公开方法来获取GlobalConfig结构
# Config().XXX 常用来获取单个配置项

func ParseConfig(cfg string) {
    if cfg == "" {
        log.Fatalln("use -c to specify configuration file")
    }

    if !file.IsExist(cfg) {
        log.Fatalln("config file:", cfg, "is not existent. maybe you need `mv cfg.example.json cfg.json`")
    }

    ConfigFile = cfg

    configContent, err := file.ToTrimString(cfg)
    if err != nil {
        log.Fatalln("read config file:", cfg, "fail:", err)
    }

    var c GlobalConfig
    err = json.Unmarshal([]byte(configContent), &c)
    if err != nil {
        log.Fatalln("parse config file:", cfg, "fail:", err)
    }

    lock.Lock()
    defer lock.Unlock()

    config = &c

    log.Println("read config file:", cfg, "successfully")
}

#公开方法返回config
func Config() *GlobalConfig {
    lock.RLock()
    defer lock.RUnlock()
    return config
}

#GlobalConfig结构化配置定义
type GlobalConfig struct {
    Debug         bool              `json:"debug"`
    Hostname      string            `json:"hostname"`
    IP            string            `json:"ip"`
    Plugin        *PluginConfig     `json:"plugin"`
    Heartbeat     *HeartbeatConfig  `json:"heartbeat"`
    Transfer      *TransferConfig   `json:"transfer"`
    Http          *HttpConfig       `json:"http"`
    Collector     *CollectorConfig  `json:"collector"`
    DefaultTags   map[string]string `json:"default_tags"`
    IgnoreMetrics map[string]bool   `json:"ignore"`
}

g 全局化初始化

    g.InitRootDir()  // 工作目录
    g.InitLocalIp()  // 本地IP
    g.InitRpcClients() // 实例化RPC_Client


# 通过os.Getwd()方式获取当前工作目录
var Root string
func InitRootDir() {
    var err error
    Root, err = os.Getwd()
    if err != nil {
        log.Fatalln("getwd fail:", err)
    }
}

# 通过net.DialTimeout方式获取本地IP地址
var LocalIp string
func InitLocalIp() {
    if Config().Heartbeat.Enabled {
        conn, err := net.DialTimeout("tcp", Config().Heartbeat.Addr, time.Second*10)
        if err != nil {
            log.Println("get local addr failed !")
        } else {
            LocalIp = strings.Split(conn.LocalAddr().String(), ":")[0]
            conn.Close()
        }
    } else {
        log.Println("hearbeat is not enabled, can't get localip")
    }
}

# 构造&SingleConnRpcClient{}对象
var HbsClient *SingleConnRpcClient
func InitRpcClients() {
    if Config().Heartbeat.Enabled {
        HbsClient = &SingleConnRpcClient{
            RpcServer: Config().Heartbeat.Addr,
            Timeout:   time.Duration(Config().Heartbeat.Timeout) * time.Millisecond,
        }
    }
}


type SingleConnRpcClient struct {
    sync.Mutex
    rpcClient *rpc.Client  
    RpcServer string       #RPC服务器
    Timeout   time.Duration #超时间隔ms
}


funcs.BuildMappers() 构建采集功能映射表,表内每项代表一个采集项
重要模块:采集映射表

func BuildMappers() {
    interval := g.Config().Transfer.Interval
    Mappers = []FuncsAndInterval{
        {   # 基础项采集 
            Fs: []func() []*model.MetricValue{
                AgentMetrics,   // Agent状态
                CpuMetrics,     // CPU
                NetMetrics,     // NET
                KernelMetrics,  // Kernel
                LoadAvgMetrics, // CPU Load Avg
                MemMetrics,     // Memory
                DiskIOMetrics,  // Disk IO
                IOStatsMetrics, // IO stats
                NetstatMetrics, // NetStat
                ProcMetrics,    // Proc
                UdpMetrics,     // Udp
            },
            Interval: interval,
        },
        {  # 设备采集
            Fs: []func() []*model.MetricValue{
                DeviceMetrics,
            },
            Interval: interval,
        },
        {  # Socket与port采集
            Fs: []func() []*model.MetricValue{
                PortMetrics,
                SocketStatSummaryMetrics,
            },
            Interval: interval,
        },
        {   # Du磁盘采集
            Fs: []func() []*model.MetricValue{
                DuMetrics,
            },
            Interval: interval,
        },
        {    # Url采集
            Fs: []func() []*model.MetricValue{
                UrlMetrics,
            },
            Interval: interval,
        },
        {   # Gpu采集
            Fs: []func() []*model.MetricValue{
                GpuMetrics,
            },
            Interval: interval,
        },
    }
}

# 单个采集功能项
type FuncsAndInterval struct {
    Fs       []func() []*model.MetricValue  //采集功能函数列表
    Interval int   //采集间隔 
}

# 完整映射列表
var Mappers []FuncsAndInterval   

cron.InitDataHistory() 周期性更新CPU/Disk状态,构建历史数据

func InitDataHistory() {
    for {
        funcs.UpdateCpuStat()  
        funcs.UpdateDiskStats()
        time.Sleep(g.COLLECT_INTERVAL)
    }
}

# funcs->cpustat更新CPU状态,最新状态保存在procStatHistory[0]
func UpdateCpuStat() error {
    ps, err := nux.CurrentProcStat()
    if err != nil {
        return err
    }

    psLock.Lock()
    defer psLock.Unlock()
    for i := historyCount - 1; i > 0; i-- {
        procStatHistory[i] = procStatHistory[i-1]
    }

    procStatHistory[0] = ps
    return nil
}

# funcs->diskstats更新Disk状态
func UpdateDiskStats() error {
    dsList, err := nux.ListDiskStats()
    if err != nil {
        return err
    }

    dsLock.Lock()
    defer dsLock.Unlock()
    for i := 0; i < len(dsList); i++ {
        device := dsList[i].Device
        diskStatsMap[device] = [2]*nux.DiskStats{dsList[i], diskStatsMap[device][0]}   //?
    }
    return nil
}

cron.ReportAgentStatus() 周期性向HBS服务器上报状态

func ReportAgentStatus() {
    if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
        go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)
    }
}

# 通过RPC方式上报Agent信息
func reportAgentStatus(interval time.Duration) {
    for {
        hostname, err := g.Hostname()
        if err != nil {
            hostname = fmt.Sprintf("error:%s", err.Error())
        }

        req := model.AgentReportRequest{
            Hostname:      hostname,    //主机名
            IP:            g.IP(),      //IP
            AgentVersion:  g.VERSION,   //版本
            PluginVersion: g.GetCurrPluginVersion(), //「插件版本信息」
        }

        var resp model.SimpleRpcResponse

        //RPC请求类型"Agent.ReportStatus"
        err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
        if err != nil || resp.Code != 0 {
            log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
        }

        time.Sleep(interval)
    }
}


//g->tool模块获取当前插件Hash信息
func GetCurrPluginVersion() string {
    if !Config().Plugin.Enabled {
        return "plugin not enabled"
    }

    pluginDir := Config().Plugin.Dir
    if !file.IsExist(pluginDir) {
        return "plugin dir not existent"
    }
  
    //获取当前活跃分支哈希值
    cmd := exec.Command("git", "rev-parse", "HEAD")
    cmd.Dir = pluginDir   //插件路径

    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return fmt.Sprintf("Error:%s", err.Error())
    }

    return strings.TrimSpace(out.String())
}

cron.SyncMinePlugins() 周期性向HBS服务器同步插件,执行插件采集数据向Transfer上报数据。 【重要功能模块:插件管理


# 公开同步插件方法
func SyncMinePlugins() {
    if !g.Config().Plugin.Enabled {
        return
    }

    if !g.Config().Heartbeat.Enabled {
        return
    }

    if g.Config().Heartbeat.Addr == "" {
        return
    }

    go syncMinePlugins()  //后台线程,同步函数
}

## RPC向HBS同步插件
func syncMinePlugins() {

    var (
        timestamp  int64 = -1
        pluginDirs []string
    )

    //同步周期间隔时长
    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        req := model.AgentHeartbeatRequest{
            Hostname: hostname,
        }

        var resp model.AgentPluginsResponse
        //RPC调用"Agent.MinePlugins"
        err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if resp.Timestamp <= timestamp {
            continue
        }

        pluginDirs = resp.Plugins
        timestamp = resp.Timestamp

        if g.Config().Debug {
            log.Println(&resp)
        }

        if len(pluginDirs) == 0 {
            plugins.ClearAllPlugins()
        }

        desiredAll := make(map[string]*plugins.Plugin)

        for _, p := range pluginDirs {
            underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))
            for k, v := range underOneDir {
                desiredAll[k] = v   //同步到的插件格式化为MAP
            }
        }

        plugins.DelNoUsePlugins(desiredAll)  //依据同步信息,「清理插件」
        plugins.AddNewPlugins(desiredAll)    //依据同步信息,「更新插件」

    }
}


### plugins->plugins清理Agent插件(HBS服务器已不存在的插件)
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
    for currKey, currPlugin := range Plugins {
        newPlugin, ok := newPlugins[currKey]
        if !ok || currPlugin.MTime != newPlugin.MTime {
            deletePlugin(currKey)
        }
    }
}

### plugins->plugins更新和添加插件
func AddNewPlugins(newPlugins map[string]*Plugin) {
    for fpath, newPlugin := range newPlugins {
        if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
            continue
        }

        Plugins[fpath] = newPlugin
        sch := NewPluginScheduler(newPlugin)
        PluginsWithScheduler[fpath] = sch
        sch.Schedule()  // 「 插件调度与执行(Ticker定时器)」
    }
}



### 公有插件变量
var (
    Plugins              = make(map[string]*Plugin)
    PluginsWithScheduler = make(map[string]*PluginScheduler)
)


#### plugins->scheduler插件调度
func (this *PluginScheduler) Schedule() {
    go func() {
        for {
            select {
            case <-this.Ticker.C:    
                PluginRun(this.Plugin) //
            case <-this.Quit:
                this.Ticker.Stop()
                return
            }
        }
    }()
}

##### plugins->scheduler插件执行
func PluginRun(plugin *Plugin) {
   
    timeout := plugin.Cycle*1000 - 500  //执行超时设置
    fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath)  //插件执行PATH

    if !file.IsExist(fpath) {
        log.Println("no such plugin:", fpath)
        return
    }

    debug := g.Config().Debug
    if debug {
        log.Println(fpath, "running...")
    }

    cmd := exec.Command(fpath)   // EXEC执行插件命令创建
    var stdout bytes.Buffer
    cmd.Stdout = &stdout
    var stderr bytes.Buffer
    cmd.Stderr = &stderr
    cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
    err := cmd.Start()    // CMD start
    if err != nil {
        log.Printf("[ERROR] plugin start fail, error: %s\n", err)
        return
    }
    if debug {
        log.Println("plugin started:", fpath)
    }

    err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond)  // CMD Run

    errStr := stderr.String()
    if errStr != "" {
        logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
        if _, err = file.WriteString(logFile, errStr); err != nil {
            log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
        }
    }

    if isTimeout {
        // has be killed    超时处理
        if err == nil && debug {
            log.Println("[INFO] timeout and kill process", fpath, "successfully")
        }

        if err != nil {
            log.Println("[ERROR] kill process", fpath, "occur error:", err)
        }

        return
    }

    if err != nil {
        log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
        return
    }

    // exec successfully
    data := stdout.Bytes()
    if len(data) == 0 {
        if debug {
            log.Println("[DEBUG] stdout of", fpath, "is blank")
        }
        return
    }

    var metrics []*model.MetricValue
    err = json.Unmarshal(data, &metrics)  //序列化数据
    if err != nil {
        log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
        return
    }

    g.SendToTransfer(metrics)    //「 上报数据 」
}

g.SendToTransfer() 向transfer上报数据

#g->var向transfer服务器上报数据
func SendToTransfer(metrics []*model.MetricValue) {
    if len(metrics) == 0 {
        return
    }

    dt := Config().DefaultTags
    if len(dt) > 0 {
        var buf bytes.Buffer
        default_tags_list := []string{}
        for k, v := range dt {
            buf.Reset()
            buf.WriteString(k)
            buf.WriteString("=")
            buf.WriteString(v)
            default_tags_list = append(default_tags_list, buf.String())
        }
        default_tags := strings.Join(default_tags_list, ",")

        for i, x := range metrics {
            buf.Reset()
            if x.Tags == "" {
                metrics[i].Tags = default_tags
            } else {
                buf.WriteString(metrics[i].Tags)
                buf.WriteString(",")
                buf.WriteString(default_tags)
                metrics[i].Tags = buf.String()
            }
        }
    }

    debug := Config().Debug

    if debug {
        log.Printf("=> <Total=%d> %v\n", len(metrics), metrics[0])
    }

    var resp model.TransferResponse
    SendMetrics(metrics, &resp)  //「send」
    if debug {
        log.Println("<=", &resp)
    }
}


# g->transfer向transfer服务器发送采集数据
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
    rand.Seed(time.Now().UnixNano())
    for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
         // 随机获取一台Transfer地址
        addr := Config().Transfer.Addrs[i]
         
        //「获取Transfer RPC Client」,如果获取为失败,「初始化Transfer RPC Client」
        c := getTransferClient(addr) 
        if c == nil {
            c = initTransferClient(addr) 
        }
  
        //「上报数据」
        if updateMetrics(c, metrics, resp) {
            break
        }
    }
}

## 获取可用的TransferClient对象
func getTransferClient(addr string) *SingleConnRpcClient {
    TransferClientsLock.RLock()
    defer TransferClientsLock.RUnlock()

    if c, ok := TransferClients[addr]; ok {
        return c
    }
    return nil
}

## 构建一个新的TransferClient对象
func initTransferClient(addr string) *SingleConnRpcClient {
    var c *SingleConnRpcClient = &SingleConnRpcClient{
        RpcServer: addr,
        Timeout:   time.Duration(Config().Transfer.Timeout) * time.Millisecond,
    }
    TransferClientsLock.Lock()
    defer TransferClientsLock.Unlock()
    TransferClients[addr] = c

    return c
}

## RPC向Transfer服务器上报metrics数据,RPC类型"Transfer.Update"
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
    err := c.Call("Transfer.Update", metrics, resp)
    if err != nil {
        log.Println("call Transfer.Update fail:", c, err)
        return false
    }
    return true
}

cron.SyncBuiltinMetrics() 向HBS服务器同步监控端口、du路径、进程和URL


func SyncBuiltinMetrics() {
    if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
        go syncBuiltinMetrics()  //后台线程同步
    }
}

# 向HBS同步
func syncBuiltinMetrics() {

    var timestamp int64 = -1
    var checksum string = "nil"

    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)

        var ports = []int64{}
        var paths = []string{}
        var procs = make(map[string]map[int]string)
        var urls = make(map[string]string)

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }

        req := model.AgentHeartbeatRequest{
            Hostname: hostname,
            Checksum: checksum,
        }

        var resp model.BuiltinMetricResponse
        
        // RPC调用类型"Agent.BuiltinMetrics"
        err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
        if err != nil {
            log.Println("ERROR:", err)
            continue
        }

        if resp.Timestamp <= timestamp {
            continue
        }

        if resp.Checksum == checksum {
            continue
        }

        timestamp = resp.Timestamp
        checksum = resp.Checksum

        for _, metric := range resp.Metrics {
            
            //健康检测的URL列表
            if metric.Metric == g.URL_CHECK_HEALTH {
                arr := strings.Split(metric.Tags, ",")
                if len(arr) != 2 {
                    continue
                }
                url := strings.Split(arr[0], "=")
                if len(url) != 2 {
                    continue
                }
                stime := strings.Split(arr[1], "=")
                if len(stime) != 2 {
                    continue
                }
                if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
                    urls[url[1]] = stime[1]
                } else {
                    log.Println("metric ParseInt timeout failed:", err)
                }
            }
           
            //监控PORT端口列表
            if metric.Metric == g.NET_PORT_LISTEN {
                arr := strings.Split(metric.Tags, "=")
                if len(arr) != 2 {
                    continue
                }

                if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
                    ports = append(ports, port)
                } else {
                    log.Println("metrics ParseInt failed:", err)
                }

                continue
            }
             
             // du路径列表
            if metric.Metric == g.DU_BS {
                arr := strings.Split(metric.Tags, "=")
                if len(arr) != 2 {
                    continue
                }

                paths = append(paths, strings.TrimSpace(arr[1]))
                continue
            }
            
            // 进程列表
            if metric.Metric == g.PROC_NUM {
                arr := strings.Split(metric.Tags, ",")

                tmpMap := make(map[int]string)

                for i := 0; i < len(arr); i++ {
                    if strings.HasPrefix(arr[i], "name=") {
                        tmpMap[1] = strings.TrimSpace(arr[i][5:])
                    } else if strings.HasPrefix(arr[i], "cmdline=") {
                        tmpMap[2] = strings.TrimSpace(arr[i][8:])
                    }
                }

                procs[metric.Tags] = tmpMap
            }
        }

        // 设置同步数据
        g.SetReportUrls(urls)   
        g.SetReportPorts(ports) 
        g.SetReportProcs(procs) 
        g.SetDuPaths(paths)     

    }
}

cron.SyncTrustableIps() 向HBS同步执行脚本的信任IP列表

func SyncTrustableIps() {
    if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
        go syncTrustableIps() //后台线程同步信任IP列表
    }
}

func syncTrustableIps() {
    duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second

    for {
        time.Sleep(duration)
        
        var ips string
        // RPC调用类型"Agent.TrustableIps"
        err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
        if err != nil {
            log.Println("ERROR: call Agent.TrustableIps fail", err)
            continue
        }
        
        // 设置列表
        g.SetTrustableIps(ips)
    }
}

cron.Collect() 周期性采集数据

func Collect() {

    if !g.Config().Transfer.Enabled {
        return
    }

    if len(g.Config().Transfer.Addrs) == 0 {
        return
    }

    for _, v := range funcs.Mappers {
        go collect(int64(v.Interval), v.Fs) //后台线程采集与上报
    }
}

## 采集与向transfer上报数据 【`关键执行收集与上报Codes`】
func collect(sec int64, fns []func() []*model.MetricValue) {
    t := time.NewTicker(time.Second * time.Duration(sec))
    defer t.Stop()
    for {
        <-t.C

        hostname, err := g.Hostname()
        if err != nil {
            continue
        }
         
        mvs := []*model.MetricValue{}
        ignoreMetrics := g.Config().IgnoreMetrics
        
        // 迭代所有采集项,执行采集Funcation,汇总Metrics
        for _, fn := range fns {
            items := fn()   //调用采集模块funcation
            if items == nil {
                continue
            }

            if len(items) == 0 {
                continue
            }

            for _, mv := range items {
                if b, ok := ignoreMetrics[mv.Metric]; ok && b {
                    continue
                } else {
                    mvs = append(mvs, mv) //汇总
                }
            }
        }

        now := time.Now().Unix()
     
        //标示Agent信息和时间戳
        for j := 0; j < len(mvs); j++ {
            mvs[j].Step = sec
            mvs[j].Endpoint = hostname
            mvs[j].Timestamp = now
        }

        g.SendToTransfer(mvs)  //向transfer上报所收集数据

    }
}

http.Start() API与Dashboard服务运行


func init() {
    configAdminRoutes()   //管理接口路由
    configCpuRoutes()     //CPU信息路由
    configDfRoutes()      //DF挂载点信息路由
    configHealthRoutes()  //Health路由
    configIoStatRoutes()  //IoStat路由
    configKernelRoutes()  //Kernel路由
    configMemoryRoutes()  //Memory路由
    configPageRoutes()    //Dashboard路由
    configPluginRoutes()  //插件路由
    configPushRoutes()    //Push路由
    configRunRoutes()     //脚本执行路由
    configSystemRoutes()  //System路由
}

func Start() {
    if !g.Config().Http.Enabled {
        return
    }

    addr := g.Config().Http.Listen
    if addr == "" {
        return
    }
   
   //
    s := &http.Server{
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,
    }

    log.Println("listening", addr)
    log.Fatalln(s.ListenAndServe())
}

# 配置http路由(Admin管理)
func configAdminRoutes() {
    http.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
        if g.IsTrustable(r.RemoteAddr) {
            w.Write([]byte("exiting..."))
            go func() {
                time.Sleep(time.Second)
                os.Exit(0)
            }()
        } else {
            w.Write([]byte("no privilege"))
        }
    })

    http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
        if g.IsTrustable(r.RemoteAddr) {
            g.ParseConfig(g.ConfigFile)
            RenderDataJson(w, g.Config())
        } else {
            w.Write([]byte("no privilege"))
        }
    })

    http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
        RenderDataJson(w, file.SelfDir())
    })

    http.HandleFunc("/ips", func(w http.ResponseWriter, r *http.Request) {
        RenderDataJson(w, g.TrustableIps())
    })
}

# 配置http路由(Dashboard静态页)
func configPageRoutes() {

    http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
        if strings.HasSuffix(r.URL.Path, "/") {
            if !file.IsExist(filepath.Join(g.Root, "/public", r.URL.Path, "index.html")) {
                http.NotFound(w, r)
                return
            }
        }
        http.FileServer(http.Dir(filepath.Join(g.Root, "/public"))).ServeHTTP(w, r)
    })

}

三 默认采集模块分析

  1. AgentMetrics -- agent alive状态采集
func AgentMetrics() []*model.MetricValue {
    return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
  1. CpuMetrics -- CPU相关信息采集
# 在main入口后台线程已周期性收集CPU状态,保存在私有procStatHistory内
# go cron.InitDataHistory()-> UpdateCpuStat() 
# 解析procStatHistory历史数据
# 比如CpuIdle值获取
func CpuIdle() float64 {
    psLock.RLock()
    defer psLock.RUnlock()
    dt := deltaTotal()
    if dt == 0 {
        return 0.0
    }
    invQuotient := 100.00 / float64(dt)
    return float64(procStatHistory[0].Cpu.Idle-procStatHistory[1].Cpu.Idle) * invQuotient  //解析cpuidle
}


func CpuMetrics() []*model.MetricValue {
    if !CpuPrepared() {
        return []*model.MetricValue{}
    }

    # CPU信息: idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches
    
    cpuIdleVal := CpuIdle()
    idle := GaugeValue("cpu.idle", cpuIdleVal) //空闲值
    busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal) //繁忙值
    user := GaugeValue("cpu.user", CpuUser()) // 用户使用CPU
    nice := GaugeValue("cpu.nice", CpuNice()) //CPU NICE值
    system := GaugeValue("cpu.system", CpuSystem()) //系统使用CPU
    iowait := GaugeValue("cpu.iowait", CpuIowait()) //IO等待
    irq := GaugeValue("cpu.irq", CpuIrq()) //IO中断请求
    softirq := GaugeValue("cpu.softirq", CpuSoftIrq()) //软中断请求
    steal := GaugeValue("cpu.steal", CpuSteal()) //Steal值
    guest := GaugeValue("cpu.guest", CpuGuest()) //Guest值
    switches := CounterValue("cpu.switches",   CurrentCpuSwitches())  //交换值
    return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
  1. NetMetrics 网卡相关信息采集
# 依据配置网卡接口信息采集数据
func NetMetrics() []*model.MetricValue {
    return CoreNetMetrics(g.Config().Collector.IfacePrefix)
}

#引用"github.com/toolkits/nux"采集网卡信息
#实现读取与解析"/proc/net/dev"信息

func CoreNetMetrics(ifacePrefix []string) []*model.MetricValue {

    netIfs, err := nux.NetIfs(ifacePrefix)
    if err != nil {
        log.Println(err)
        return []*model.MetricValue{}
    }

    cnt := len(netIfs)
    ret := make([]*model.MetricValue, cnt*26)

    for idx, netIf := range netIfs {
        iface := "iface=" + netIf.Iface
        ret[idx*26+0] = CounterValue("net.if.in.bytes", netIf.InBytes, iface) //入字节数
        ret[idx*26+1] = CounterValue("net.if.in.packets", netIf.InPackages, iface) //入包数
        ret[idx*26+2] = CounterValue("net.if.in.errors", netIf.InErrors, iface) //入错误包数
        ret[idx*26+3] = CounterValue("net.if.in.dropped", netIf.InDropped, iface) //入丢包数
        ret[idx*26+4] = CounterValue("net.if.in.fifo.errs", netIf.InFifoErrs, iface) //FIFO ERROR
        ret[idx*26+5] = CounterValue("net.if.in.frame.errs", netIf.InFrameErrs, iface) //帧ERROR
        ret[idx*26+6] = CounterValue("net.if.in.compressed", netIf.InCompressed, iface) //压缩
        ret[idx*26+7] = CounterValue("net.if.in.multicast", netIf.InMulticast, iface) //多播
        ret[idx*26+8] = CounterValue("net.if.out.bytes", netIf.OutBytes, iface) //出字节数
        ret[idx*26+9] = CounterValue("net.if.out.packets", netIf.OutPackages, iface) //出包数
        ret[idx*26+10] = CounterValue("net.if.out.errors", netIf.OutErrors, iface) //出错误数
        ret[idx*26+11] = CounterValue("net.if.out.dropped", netIf.OutDropped, iface) // 出丢包数
        ret[idx*26+12] = CounterValue("net.if.out.fifo.errs", netIf.OutFifoErrs, iface) //出FIFO ERROR
        ret[idx*26+13] = CounterValue("net.if.out.collisions", netIf.OutCollisions, iface) //出冲突
        ret[idx*26+14] = CounterValue("net.if.out.carrier.errs", netIf.OutCarrierErrs, iface) //载波
        ret[idx*26+15] = CounterValue("net.if.out.compressed", netIf.OutCompressed, iface) //压缩
        ret[idx*26+16] = CounterValue("net.if.total.bytes", netIf.TotalBytes, iface) //总字节
        ret[idx*26+17] = CounterValue("net.if.total.packets", netIf.TotalPackages, iface) //总包数
        ret[idx*26+18] = CounterValue("net.if.total.errors", netIf.TotalErrors, iface) //总错误
        ret[idx*26+19] = CounterValue("net.if.total.dropped", netIf.TotalDropped, iface) //总丢包数
        ret[idx*26+20] = GaugeValue("net.if.speed.bits", netIf.SpeedBits, iface) //速率bs
        ret[idx*26+21] = CounterValue("net.if.in.percent", netIf.InPercent, iface) //进百分比
        ret[idx*26+22] = CounterValue("net.if.out.percent", netIf.OutPercent, iface) //出百分比
        ret[idx*26+23] = CounterValue("net.if.in.bits", netIf.InBytes*8, iface) //进速率bs
        ret[idx*26+24] = CounterValue("net.if.out.bits", netIf.OutBytes*8, iface) //出速率bs
        ret[idx*26+25] = CounterValue("net.if.total.bits", netIf.TotalBytes*8, iface) //总速率bs
    }
    return ret
}
  1. KernelMetrics 系统内核相关信息采集
#引用"github.com/toolkits/nux"采集内核信息
#实现读取"/proc/sys/fs/file-max" 
# "/proc/sys/fs/file-nr"  
# "/proc/sys/kernel/pid_max" 

func KernelMetrics() (L []*model.MetricValue) {

    maxFiles, err := nux.KernelMaxFiles()
    if err != nil {
        log.Println(err)
        return
    }
 
    //最大打开文件数
    L = append(L, GaugeValue("kernel.maxfiles", maxFiles))

    maxProc, err := nux.KernelMaxProc()
    if err != nil {
        log.Println(err)
        return
    }
   
    //最大进程数
    L = append(L, GaugeValue("kernel.maxproc", maxProc))

    allocateFiles, err := nux.KernelAllocateFiles()
    if err != nil {
        log.Println(err)
        return
    }
   
    //已分配文件数
    L = append(L, GaugeValue("kernel.files.allocated", allocateFiles))
    //已剩余文件数
    L = append(L, GaugeValue("kernel.files.left", maxFiles-allocateFiles))
    return
}
  1. LoadAvgMetrics -- CPU负载信息采集
#引用"github.com/toolkits/nux"采集CPU负载信息
#实现读取与解析 "/proc/loadavg"信息 
#

func LoadAvgMetrics() []*model.MetricValue {
    load, err := nux.LoadAvg()
    if err != nil {
        log.Println(err)
        return nil
    }

    return []*model.MetricValue{
        GaugeValue("load.1min", load.Avg1min), //1分钟平均
        GaugeValue("load.5min", load.Avg5min), //5分钟平均
        GaugeValue("load.15min", load.Avg15min), //15分钟平均
    }

}
  1. MemMetrics -- 内存信息采集
#引用"github.com/toolkits/nux"采集内存信息
#实现读取与解析"/proc/meminfo"信息

func MemMetrics() []*model.MetricValue {
    m, err := nux.MemInfo()
    if err != nil {
        log.Println(err)
        return nil
    }

    memFree := m.MemFree + m.Buffers + m.Cached
    memUsed := m.MemTotal - memFree

    pmemFree := 0.0
    pmemUsed := 0.0
    if m.MemTotal != 0 {
        pmemFree = float64(memFree) * 100.0 / float64(m.MemTotal)
        pmemUsed = float64(memUsed) * 100.0 / float64(m.MemTotal)
    }

    pswapFree := 0.0
    pswapUsed := 0.0
    if m.SwapTotal != 0 {
        pswapFree = float64(m.SwapFree) * 100.0 / float64(m.SwapTotal)
        pswapUsed = float64(m.SwapUsed) * 100.0 / float64(m.SwapTotal)
    }

    return []*model.MetricValue{
        GaugeValue("mem.memtotal", m.MemTotal),  //总内存大小
        GaugeValue("mem.memused", memUsed), //已使用内存大小
        GaugeValue("mem.memfree", memFree), //可用内存大小
        GaugeValue("mem.swaptotal", m.SwapTotal), //交换内存总大小
        GaugeValue("mem.swapused", m.SwapUsed), //已使用交换内存大小
        GaugeValue("mem.swapfree", m.SwapFree), //可用交换内存大小
        GaugeValue("mem.memfree.percent", pmemFree), //可用内存占比
        GaugeValue("mem.memused.percent", pmemUsed), //已用内存占比
        GaugeValue("mem.swapfree.percent", pswapFree), //可用交换内存占比
        GaugeValue("mem.swapused.percent", pswapUsed), //已用交换同内存占比
    }

}
  1. DiskIOMetrics -- 磁盘IO信息采集
#引用"github.com/toolkits/nux"采集DISK信息
#实现读取与解析"/proc/diskstats"信息

func DiskIOMetrics() (L []*model.MetricValue) {

    dsList, err := nux.ListDiskStats()
    if err != nil {
        log.Println(err)
        return
    }
   
    //迭代DISK列表
    for _, ds := range dsList {
        if !ShouldHandleDevice(ds.Device) {
            continue
        }

        device := "device=" + ds.Device

        L = append(L, CounterValue("disk.io.read_requests", ds.ReadRequests, device))//读请求
        L = append(L, CounterValue("disk.io.read_merged", ds.ReadMerged, device)) //相邻的读取请求合并
        L = append(L, CounterValue("disk.io.read_sectors", ds.ReadSectors, device)) //读扇区
        L = append(L, CounterValue("disk.io.msec_read", ds.MsecRead, device)) //disk读花费的时间
        L = append(L, CounterValue("disk.io.write_requests", ds.WriteRequests, device)) //写请求
        L = append(L, CounterValue("disk.io.write_merged", ds.WriteMerged, device)) //相邻的写请求合并
        L = append(L, CounterValue("disk.io.write_sectors", ds.WriteSectors, device)) //写扇区
        L = append(L, CounterValue("disk.io.msec_write", ds.MsecWrite, device)) //disk写花费的时间
        L = append(L, CounterValue("disk.io.ios_in_progress", ds.IosInProgress, device)) //正处理的IO数
        L = append(L, CounterValue("disk.io.msec_total", ds.MsecTotal, device)) //总花费的时间
        L = append(L, CounterValue("disk.io.msec_weighted_total", ds.MsecWeightedTotal, device)) //统计最近的IO完成时间和积压
    }
    return
}
  1. IOStatsMetrics -- 磁盘IO信息采集
# 在main入口后台线程已周期性收集CPU状态,保存在私有procStatHistory内
# go cron.InitDataHistory()-> UpdateDiskStats()
# 私有变量diskStatsMap保存周期性数据
# 解析 diskStatsMap 获取IO状态数据


func IOStatsMetrics() (L []*model.MetricValue) {
    dsLock.RLock()
    defer dsLock.RUnlock()

    for device := range diskStatsMap {
        if !ShouldHandleDevice(device) {
            continue
        }

        tags := "device=" + device
        rio := IODelta(device, IOReadRequests)
        wio := IODelta(device, IOWriteRequests)
        delta_rsec := IODelta(device, IOReadSectors)
        delta_wsec := IODelta(device, IOWriteSectors)
        ruse := IODelta(device, IOMsecRead)
        wuse := IODelta(device, IOMsecWrite)
        use := IODelta(device, IOMsecTotal)
        n_io := rio + wio
        avgrq_sz := 0.0
        await := 0.0
        svctm := 0.0
        if n_io != 0 {
            avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io)
            await = float64(ruse+wuse) / float64(n_io)
            svctm = float64(use) / float64(n_io)
        }

        duration := IODelta(device, TS)

        L = append(L, GaugeValue("disk.io.read_bytes", float64(delta_rsec)*512.0, tags)) // 读字节数
        L = append(L, GaugeValue("disk.io.write_bytes", float64(delta_wsec)*512.0, tags)) // 写字节数
        L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags)) //平均请求扇区的大小
        L = append(L, GaugeValue("disk.io.avgqu-sz",  float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags)) //是平均请求队列的长度
        L = append(L, GaugeValue("disk.io.await", await, tags)) //每一个IO请求的处理的平均时间
        L = append(L, GaugeValue("disk.io.svctm", svctm, tags))//表示平均每次设备I/O操作的服务时间
        tmp := float64(use) * 100.0 / float64(duration)
        if tmp > 100.0 {
            tmp = 100.0
        }
        L = append(L, GaugeValue("disk.io.util", tmp, tags)) //在统计时间内所有处理IO时间,除以总共统计时间
    }

    return
}

  1. NetstatMetrics -- TCP信息采集
#引用"github.com/toolkits/nux"采集TCP信息
#实现为读"/proc/net/netstat"信息并解析

func NetstatMetrics() (L []*model.MetricValue) {
    tcpExts, err := nux.Netstat("TcpExt")

    if err != nil {
        log.Println(err)
        return
    }

    cnt := len(tcpExts)
    if cnt == 0 {
        return
    }

    for key, val := range tcpExts {
        if _, ok := USES[key]; !ok {
            continue
        }
        L = append(L, CounterValue("TcpExt."+key, val))
    }

    return
}
  1. ProcMetrics -- 采集Proc相关信息
# main入口syncBuiltinMetrics()从HBS同步SetReportProcs()
# g.ReportProcs()获取Procs列表map
# 获取"/proc"目录下"/proc/%pid/cmdline"

func ProcMetrics() (L []*model.MetricValue) {

    reportProcs := g.ReportProcs()
    sz := len(reportProcs)
    if sz == 0 {
        return
    }

    ps, err := nux.AllProcs()
    if err != nil {
        log.Println(err)
        return
    }

    pslen := len(ps)

    for tags, m := range reportProcs {
        cnt := 0
        for i := 0; i < pslen; i++ {
            if is_a(ps[i], m) {
                cnt++
            }
        }

        L = append(L, GaugeValue(g.PROC_NUM, cnt, tags))
    }

    return
}
  1. UdpMetrics UDP接收和发送数据报采集
# 实现获取"/proc/net/snmp"信息UDP接收和发送数据报

func UdpMetrics() []*model.MetricValue {
    udp, err := nux.Snmp("Udp")
    if err != nil {
        log.Println("read snmp fail", err)
        return []*model.MetricValue{}
    }

    count := len(udp)
    ret := make([]*model.MetricValue, count)
    i := 0
    for key, val := range udp {
        ret[i] = CounterValue("snmp.Udp."+key, val)
        i++
    }

    return ret
}
  1. DeviceMetrics -- 挂载设备信息采集
# 实现获取与解析"/proc/mounts"信息

func DeviceMetrics() (L []*model.MetricValue) {
    mountPoints, err := nux.ListMountPoint()

    if err != nil {
        log.Error("collect device metrics fail:", err)
        return
    }

    var myMountPoints map[string]bool = make(map[string]bool)

    if len(g.Config().Collector.MountPoint) > 0 {
        for _, mp := range g.Config().Collector.MountPoint {
            myMountPoints[mp] = true
        }
    }

    var diskTotal uint64 = 0
    var diskUsed uint64 = 0

    for idx := range mountPoints {
        fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2]
        if len(myMountPoints) > 0 {
            if _, ok := myMountPoints[fsFile]; !ok {
                log.Debug("mount point not matched with config", fsFile, "ignored.")
                continue
            }
        }

        var du *nux.DeviceUsage
        du, err = nux.BuildDeviceUsage(fsSpec, fsFile, fsVfstype)
        if err != nil {
            log.Error(err)
            continue
        }

        if du.BlocksAll == 0 {
            continue
        }

        diskTotal += du.BlocksAll
        diskUsed += du.BlocksUsed

        tags := fmt.Sprintf("mount=%s,fstype=%s", du.FsFile, du.FsVfstype) 
        L = append(L, GaugeValue("df.bytes.total", du.BlocksAll, tags))  //总大小字节
        L = append(L, GaugeValue("df.bytes.used", du.BlocksUsed, tags))  //已使用小大字节
        L = append(L, GaugeValue("df.bytes.free", du.BlocksFree, tags))  //剩余小大字节
        L = append(L, GaugeValue("df.bytes.used.percent", du.BlocksUsedPercent, tags)) //使用占比 
        L = append(L, GaugeValue("df.bytes.free.percent", du.BlocksFreePercent, tags)) //剩余块占比

        if du.InodesAll == 0 {
            continue
        }

        L = append(L, GaugeValue("df.inodes.total", du.InodesAll, tags))  //inode总数
        L = append(L, GaugeValue("df.inodes.used", du.InodesUsed, tags))  //已用inode数目
        L = append(L, GaugeValue("df.inodes.free", du.InodesFree, tags))  //可用inode数目
        L = append(L, GaugeValue("df.inodes.used.percent", du.InodesUsedPercent, tags)) // 已用inode占比
        L = append(L, GaugeValue("df.inodes.free.percent", du.InodesFreePercent, tags)) // 可用inode占比

    }

    if len(L) > 0 && diskTotal > 0 {
        L = append(L, GaugeValue("df.statistics.total", float64(diskTotal))) //统计总数 
        L = append(L, GaugeValue("df.statistics.used", float64(diskUsed))) // 统计使用
        L = append(L, GaugeValue("df.statistics.used.percent", float64(diskUsed)*100.0/float64(diskTotal)))  //统计使用占比
    }

    return
}

  1. PortMetrics -- TCP / UDP 端口信息采集
# main入口syncBuiltinMetrics()-> g.SetReportPorts(ports)从HBS同步检测端口列表
# g.ReportPorts() 获取列表
# 实现获取TCP "ss -t -l -n" /获取UDP "ss -u -a -n"监听信息

func PortMetrics() (L []*model.MetricValue) {

    reportPorts := g.ReportPorts()
    sz := len(reportPorts)
    if sz == 0 {
        return
    }
   
    allTcpPorts, err := nux.TcpPorts()
    if err != nil {
        log.Println(err)
        return
    }

    allUdpPorts, err := nux.UdpPorts()
    if err != nil {
        log.Println(err)
        return
    }

    for i := 0; i < sz; i++ {
        tags := fmt.Sprintf("port=%d", reportPorts[i])
        if slice.ContainsInt64(allTcpPorts, reportPorts[i]) || slice.ContainsInt64(allUdpPorts, reportPorts[i]) {
            L = append(L, GaugeValue(g.NET_PORT_LISTEN, 1, tags))
        } else {
            L = append(L, GaugeValue(g.NET_PORT_LISTEN, 0, tags))
        }
    }

    return
}
  1. SocketStatSummaryMetrics -- socket统计信息采集
# 实现获取socket统计信息 "ss -s" 
func SocketStatSummaryMetrics() (L []*model.MetricValue) {
    ssMap, err := nux.SocketStatSummary()
    if err != nil {
        log.Println(err)
        return
    }

    for k, v := range ssMap {
        L = append(L, GaugeValue("ss."+k, v))
    }

    return
}
  1. DuMetrics

# main入口syncBuiltinMetrics()从HBS同步g.SetDuPaths(paths)
# g.DuPaths()获取磁盘分区列表

func DuMetrics() (L []*model.MetricValue) {
    paths := g.DuPaths()
    result := make(chan *model.MetricValue, len(paths))
    var wg sync.WaitGroup

    //迭代所有PATH
    for _, path := range paths {
        wg.Add(1)
        go func(filepath string) {
            var err error
            defer func() {
                if err != nil {
                    log.Println(err)
                    result <- GaugeValue(g.DU_BS, -1, "path="+path)
                }
                wg.Done()
            }()
            //tips:osx  does not support -b.
            cmd := exec.Command("du", "-bs", path) //exec执行du
            var stdout bytes.Buffer
            cmd.Stdout = &stdout
            var stderr bytes.Buffer
            cmd.Stderr = &stderr
            err = cmd.Start()
            if err != nil {
                return

            }
            err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Second)
            if isTimeout {
                err = errors.New(fmt.Sprintf("exec cmd : du -bs %s timeout", path))
                return
            }

            errStr := stderr.String()
            if errStr != "" {
                err = errors.New(errStr)
                return
            }

            if err != nil {
                err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, err.Error()))
                return
            }

            arr := strings.Fields(stdout.String())
            if len(arr) < 2 {
                errors.New(fmt.Sprintf("du -bs %s failed: %s", path, "return fields < 2"))
                return
            }

            size, err := strconv.ParseUint(arr[0], 10, 64)
            if err != nil {
                err = errors.New(fmt.Sprintf("cannot parse du -bs %s output", path))
                return
            }
            result <- GaugeValue(g.DU_BS, size, "path="+path)
        }(path)
    }
    wg.Wait()

    resultLen := len(result)
    for i := 0; i < resultLen; i++ {
        L = append(L, <-result)
    }
    return
}
  1. UrlMetrics -- URL健康检测采集
 func UrlMetrics() (L []*model.MetricValue) {
    //获取周期性向HBS所同步的URL列表
    reportUrls := g.ReportUrls() 
    sz := len(reportUrls)
    if sz == 0 {
        return
    }
    hostname, err := g.Hostname()
    if err != nil {
        hostname = "None"
    }
    // 迭代URL列表,执行URL状态检测
    for furl, timeout := range reportUrls {
        tags := fmt.Sprintf("url=%v,timeout=%v,src=%v", furl, timeout, hostname)
        //「probeUrl检测」
        if ok, _ := probeUrl(furl, timeout); !ok {
            L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 0, tags))
            continue
        }
        L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 1, tags))
    }
    return
}

//通过调用系统命令执行Curl命令进行Http_code的状态检测
func probeUrl(furl string, timeout string) (bool, error) {
    bs, err := sys.CmdOutBytes("curl", "--max-filesize", "102400", "-I", "-m", timeout, "-o", "/dev/null", "-s", "-w", "%{http_code}", furl)
    if err != nil {
        log.Printf("probe url [%v] failed.the err is: [%v]\n", furl, err)
        return false, err
    }
    reader := bufio.NewReader(bytes.NewBuffer(bs))
    retcode, err := file.ReadLine(reader)
    if err != nil {
        log.Println("read retcode failed.err is:", err)
        return false, err
    }
    if strings.TrimSpace(string(retcode)) != "200" {
        log.Printf("return code [%v] is not 200.query url is [%v]", string(retcode), furl)
        return false, err
    }
    return true, err
}

  1. GpuMetrics -- GPU信息采集

// 引用外部库"github.com/mindprince/gonvml"
// 需要load libnvidia-ml.so.1库
func GpuMetrics() (L []*model.MetricValue) {

    if err := gonvml.Initialize(); err != nil {
        debug := g.Config().Debug
        if debug {
            log.Println("Initialize error: ", err)
        }
        return
    }

    defer gonvml.Shutdown()

    count, err := gonvml.DeviceCount()
    if err != nil {
        log.Println("DeviceCount error: ", err)
        return
    }

    if count == 0 {
        return
    }

    temperature := uint(0)
    totalMemory := uint64(0)
    usedMemory := uint64(0)
    gpuUtilization := uint(0)
    memoryUtilization := uint(0)
    powerUsage := uint(0)
    allUtilization := uint(0)
    allMemoryUtilization := uint(0)

    for i := 0; i < int(count); i++ {
        dev, err := gonvml.DeviceHandleByIndex(uint(i))
        if err != nil {
            log.Println("DeviceHandleByIndex error:", err)
            continue
        }

        uuid, err := dev.UUID()
        if err != nil {
            log.Println("dev.UUID error", err)
        }

        tag := "uuid=" + uuid

        // 不是所有gpu都有风扇
        fanSpeed, err := dev.FanSpeed()
        if err != nil {
            log.Println("dev.FanSpeed error: ", err)
        } else {
            L = append(L, GaugeValue("gpu.fan.speed", fanSpeed, tag))
        }

        temperature, err = dev.Temperature()
        if err != nil {
            log.Println("dev.Temperature error: ", err)
            continue
        }

        totalMemory, usedMemory, err = dev.MemoryInfo()
        if err != nil {
            log.Println("dev.MemoryInfo error: ", err)
            continue
        }

        // 单位换算为兆
        totalBillion := float64(totalMemory / 1024 / 1024)
        usedBillion := float64(usedMemory / 1024 / 1024)

        gpuUtilization, memoryUtilization, err = dev.UtilizationRates()
        if err != nil {
            log.Println("dev.UtilizationRates error: ", err)
            continue
        }

        allUtilization += gpuUtilization
        allMemoryUtilization += memoryUtilization

        powerUsage, err = dev.PowerUsage()
        if err != nil {
            log.Println("dev.PowerUsage error: ", err)
        }

        // 单位换算为瓦特
        powerWatt := float64(powerUsage / 1000)

        L = append(L, GaugeValue("gpu.temperature", temperature, tag))
        L = append(L, GaugeValue("gpu.memory.total", totalBillion, tag))
        L = append(L, GaugeValue("gpu.memory.used", usedBillion, tag))
        L = append(L, GaugeValue("gpu.memory.util", memoryUtilization, tag))
        L = append(L, GaugeValue("gpu.util", gpuUtilization, tag))
        L = append(L, GaugeValue("gpu.power.usage", powerWatt, tag))
    }

    L = append(L, GaugeValue("gpu.count", count))
    L = append(L, GaugeValue("gpu.util.avg", allUtilization/count))
    L = append(L, GaugeValue("gpu.memory.util.avg", allMemoryUtilization/count))
    return L
}

四 思考与查证

  1. 请查看源码,查证Agent所关联的组件HBS/Transfer哪个是集群模式?哪个是单点模式?为什么?
  2. 如果需要自定义Agent的采集数据有哪几种方式?从源码角度又应该如何添加相应的采集Code?
  3. 聊一聊从源码角度Agent组件哪些代码技术点或模式点值得你好好学习与借鉴的?

相关文章

网友评论

    本文标题:OpenFalcon源码分析(Agent组件)

    本文链接:https://www.haomeiwen.com/subject/ypbhnftx.html