一 基本说明
Agent VERSION "5.1.2"
- Agent源代码文件目录说明
-
调用关系
调用关系 -
组件交互关系
组件交互关系
二 源码分析
Main 入口函数
# 命令行参数解析
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
check := flag.Bool("check", false, "check collector")
flag.Parse()
# 命令行版本查看
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
# 命令行收集器检测
if *check {
funcs.CheckCollector() #【参考相关详细分析】
os.Exit(0)
}
# Agent配置文件解析,g.Config()可获取完整配置
g.ParseConfig(*cfg) #【参考相关详细分析】
if g.Config().Debug {
g.InitLog("debug")
} else {
g.InitLog("info")
}
# 全局初始化工作(工作目录/本地IP/RPC_Client)
g.InitRootDir() #【参考相关详细分析】
g.InitLocalIp() #【参考相关详细分析】
g.InitRpcClients() #【参考相关详细分析】
# 采集功能与模块构造映射Map
funcs.BuildMappers() #【参考相关详细分析】
#
go cron.InitDataHistory() #【参考相关详细分析】
# 上报Agent自身状态
cron.ReportAgentStatus() #【参考相关详细分析】
# 同步模块
cron.SyncMinePlugins() #【参考相关详细分析】
cron.SyncBuiltinMetrics() #【参考相关详细分析】
cron.SyncTrustableIps() #【参考相关详细分析】
# 采集数据
cron.Collect() #【参考相关详细分析】
# API服务器运行
go http.Start() #【参考相关详细分析】
# 阻塞主进程
select {}
funcs.CheckCollector() 各采集模块功能状态检测
func CheckCollector() {
output := make(map[string]bool)
# 引用外部"github.com/toolkits/nux"功能获取
# CurrentProcStat()
# ListDiskStats()
# ListeningPorts()
# AllProcs()
_, procStatErr := nux.CurrentProcStat()
_, listDiskErr := nux.ListDiskStats()
ports, listeningPortsErr := nux.ListeningPorts()
procs, psErr := nux.AllProcs()
# 引用外部"github.com/toolkits/sys"功能执行模块Du系统命令
_, duErr := sys.CmdOut("du", "--help")
# 各采集功能模块采集调用与结果判断(各模块采集代码实现可进一步分析)
output["kernel "] = len(KernelMetrics()) > 0
output["df.bytes"] = DeviceMetricsCheck()
output["net.if "] = len(CoreNetMetrics([]string{})) > 0
output["loadavg "] = len(LoadAvgMetrics()) > 0
output["cpustat "] = procStatErr == nil
output["disk.io "] = listDiskErr == nil
output["memory "] = len(MemMetrics()) > 0
output["netstat "] = len(NetstatMetrics()) > 0
output["ss -s "] = len(SocketStatSummaryMetrics()) > 0
output["ss -tln "] = listeningPortsErr == nil && len(ports) > 0
output["ps aux "] = psErr == nil && len(procs) > 0
output["du -bs "] = duErr == nil
# 将MAP output的各值进行结果输出,True为OK,False为fail打印显示
for k, v := range output {
status := "fail"
if v {
status = "ok"
}
fmt.Println(k, "...", status)
}
}
g.ParseConfig(*cfg) Agent配置文件解析
#通过命令指定配置文件,默认为同级目录下cfg.json
cfg := flag.String("c", "cfg.json", "configuration file")
# ParseConfig()解析cfg配置文件并保存在私有变量config内,可通过
# Config()公开方法来获取GlobalConfig结构
# Config().XXX 常用来获取单个配置项
func ParseConfig(cfg string) {
if cfg == "" {
log.Fatalln("use -c to specify configuration file")
}
if !file.IsExist(cfg) {
log.Fatalln("config file:", cfg, "is not existent. maybe you need `mv cfg.example.json cfg.json`")
}
ConfigFile = cfg
configContent, err := file.ToTrimString(cfg)
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}
var c GlobalConfig
err = json.Unmarshal([]byte(configContent), &c)
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}
lock.Lock()
defer lock.Unlock()
config = &c
log.Println("read config file:", cfg, "successfully")
}
#公开方法返回config
func Config() *GlobalConfig {
lock.RLock()
defer lock.RUnlock()
return config
}
#GlobalConfig结构化配置定义
type GlobalConfig struct {
Debug bool `json:"debug"`
Hostname string `json:"hostname"`
IP string `json:"ip"`
Plugin *PluginConfig `json:"plugin"`
Heartbeat *HeartbeatConfig `json:"heartbeat"`
Transfer *TransferConfig `json:"transfer"`
Http *HttpConfig `json:"http"`
Collector *CollectorConfig `json:"collector"`
DefaultTags map[string]string `json:"default_tags"`
IgnoreMetrics map[string]bool `json:"ignore"`
}
g 全局化初始化
g.InitRootDir() // 工作目录
g.InitLocalIp() // 本地IP
g.InitRpcClients() // 实例化RPC_Client
# 通过os.Getwd()方式获取当前工作目录
var Root string
func InitRootDir() {
var err error
Root, err = os.Getwd()
if err != nil {
log.Fatalln("getwd fail:", err)
}
}
# 通过net.DialTimeout方式获取本地IP地址
var LocalIp string
func InitLocalIp() {
if Config().Heartbeat.Enabled {
conn, err := net.DialTimeout("tcp", Config().Heartbeat.Addr, time.Second*10)
if err != nil {
log.Println("get local addr failed !")
} else {
LocalIp = strings.Split(conn.LocalAddr().String(), ":")[0]
conn.Close()
}
} else {
log.Println("hearbeat is not enabled, can't get localip")
}
}
# 构造&SingleConnRpcClient{}对象
var HbsClient *SingleConnRpcClient
func InitRpcClients() {
if Config().Heartbeat.Enabled {
HbsClient = &SingleConnRpcClient{
RpcServer: Config().Heartbeat.Addr,
Timeout: time.Duration(Config().Heartbeat.Timeout) * time.Millisecond,
}
}
}
type SingleConnRpcClient struct {
sync.Mutex
rpcClient *rpc.Client
RpcServer string #RPC服务器
Timeout time.Duration #超时间隔ms
}
funcs.BuildMappers() 构建采集功能映射表,表内每项代表一个采集项
【重要模块:采集映射表
】
func BuildMappers() {
interval := g.Config().Transfer.Interval
Mappers = []FuncsAndInterval{
{ # 基础项采集
Fs: []func() []*model.MetricValue{
AgentMetrics, // Agent状态
CpuMetrics, // CPU
NetMetrics, // NET
KernelMetrics, // Kernel
LoadAvgMetrics, // CPU Load Avg
MemMetrics, // Memory
DiskIOMetrics, // Disk IO
IOStatsMetrics, // IO stats
NetstatMetrics, // NetStat
ProcMetrics, // Proc
UdpMetrics, // Udp
},
Interval: interval,
},
{ # 设备采集
Fs: []func() []*model.MetricValue{
DeviceMetrics,
},
Interval: interval,
},
{ # Socket与port采集
Fs: []func() []*model.MetricValue{
PortMetrics,
SocketStatSummaryMetrics,
},
Interval: interval,
},
{ # Du磁盘采集
Fs: []func() []*model.MetricValue{
DuMetrics,
},
Interval: interval,
},
{ # Url采集
Fs: []func() []*model.MetricValue{
UrlMetrics,
},
Interval: interval,
},
{ # Gpu采集
Fs: []func() []*model.MetricValue{
GpuMetrics,
},
Interval: interval,
},
}
}
# 单个采集功能项
type FuncsAndInterval struct {
Fs []func() []*model.MetricValue //采集功能函数列表
Interval int //采集间隔
}
# 完整映射列表
var Mappers []FuncsAndInterval
cron.InitDataHistory() 周期性更新CPU/Disk状态,构建历史数据
func InitDataHistory() {
for {
funcs.UpdateCpuStat()
funcs.UpdateDiskStats()
time.Sleep(g.COLLECT_INTERVAL)
}
}
# funcs->cpustat更新CPU状态,最新状态保存在procStatHistory[0]
func UpdateCpuStat() error {
ps, err := nux.CurrentProcStat()
if err != nil {
return err
}
psLock.Lock()
defer psLock.Unlock()
for i := historyCount - 1; i > 0; i-- {
procStatHistory[i] = procStatHistory[i-1]
}
procStatHistory[0] = ps
return nil
}
# funcs->diskstats更新Disk状态
func UpdateDiskStats() error {
dsList, err := nux.ListDiskStats()
if err != nil {
return err
}
dsLock.Lock()
defer dsLock.Unlock()
for i := 0; i < len(dsList); i++ {
device := dsList[i].Device
diskStatsMap[device] = [2]*nux.DiskStats{dsList[i], diskStatsMap[device][0]} //?
}
return nil
}
cron.ReportAgentStatus() 周期性向HBS服务器上报状态
func ReportAgentStatus() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second)
}
}
# 通过RPC方式上报Agent信息
func reportAgentStatus(interval time.Duration) {
for {
hostname, err := g.Hostname()
if err != nil {
hostname = fmt.Sprintf("error:%s", err.Error())
}
req := model.AgentReportRequest{
Hostname: hostname, //主机名
IP: g.IP(), //IP
AgentVersion: g.VERSION, //版本
PluginVersion: g.GetCurrPluginVersion(), //「插件版本信息」
}
var resp model.SimpleRpcResponse
//RPC请求类型"Agent.ReportStatus"
err = g.HbsClient.Call("Agent.ReportStatus", req, &resp)
if err != nil || resp.Code != 0 {
log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp)
}
time.Sleep(interval)
}
}
//g->tool模块获取当前插件Hash信息
func GetCurrPluginVersion() string {
if !Config().Plugin.Enabled {
return "plugin not enabled"
}
pluginDir := Config().Plugin.Dir
if !file.IsExist(pluginDir) {
return "plugin dir not existent"
}
//获取当前活跃分支哈希值
cmd := exec.Command("git", "rev-parse", "HEAD")
cmd.Dir = pluginDir //插件路径
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
return fmt.Sprintf("Error:%s", err.Error())
}
return strings.TrimSpace(out.String())
}
cron.SyncMinePlugins() 周期性向HBS服务器同步插件,执行插件采集数据向Transfer上报数据。 【
重要功能模块:插件管理
】
# 公开同步插件方法
func SyncMinePlugins() {
if !g.Config().Plugin.Enabled {
return
}
if !g.Config().Heartbeat.Enabled {
return
}
if g.Config().Heartbeat.Addr == "" {
return
}
go syncMinePlugins() //后台线程,同步函数
}
## RPC向HBS同步插件
func syncMinePlugins() {
var (
timestamp int64 = -1
pluginDirs []string
)
//同步周期间隔时长
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
hostname, err := g.Hostname()
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{
Hostname: hostname,
}
var resp model.AgentPluginsResponse
//RPC调用"Agent.MinePlugins"
err = g.HbsClient.Call("Agent.MinePlugins", req, &resp)
if err != nil {
log.Println("ERROR:", err)
continue
}
if resp.Timestamp <= timestamp {
continue
}
pluginDirs = resp.Plugins
timestamp = resp.Timestamp
if g.Config().Debug {
log.Println(&resp)
}
if len(pluginDirs) == 0 {
plugins.ClearAllPlugins()
}
desiredAll := make(map[string]*plugins.Plugin)
for _, p := range pluginDirs {
underOneDir := plugins.ListPlugins(strings.Trim(p, "/"))
for k, v := range underOneDir {
desiredAll[k] = v //同步到的插件格式化为MAP
}
}
plugins.DelNoUsePlugins(desiredAll) //依据同步信息,「清理插件」
plugins.AddNewPlugins(desiredAll) //依据同步信息,「更新插件」
}
}
### plugins->plugins清理Agent插件(HBS服务器已不存在的插件)
func DelNoUsePlugins(newPlugins map[string]*Plugin) {
for currKey, currPlugin := range Plugins {
newPlugin, ok := newPlugins[currKey]
if !ok || currPlugin.MTime != newPlugin.MTime {
deletePlugin(currKey)
}
}
}
### plugins->plugins更新和添加插件
func AddNewPlugins(newPlugins map[string]*Plugin) {
for fpath, newPlugin := range newPlugins {
if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime {
continue
}
Plugins[fpath] = newPlugin
sch := NewPluginScheduler(newPlugin)
PluginsWithScheduler[fpath] = sch
sch.Schedule() // 「 插件调度与执行(Ticker定时器)」
}
}
### 公有插件变量
var (
Plugins = make(map[string]*Plugin)
PluginsWithScheduler = make(map[string]*PluginScheduler)
)
#### plugins->scheduler插件调度
func (this *PluginScheduler) Schedule() {
go func() {
for {
select {
case <-this.Ticker.C:
PluginRun(this.Plugin) //
case <-this.Quit:
this.Ticker.Stop()
return
}
}
}()
}
##### plugins->scheduler插件执行
func PluginRun(plugin *Plugin) {
timeout := plugin.Cycle*1000 - 500 //执行超时设置
fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath) //插件执行PATH
if !file.IsExist(fpath) {
log.Println("no such plugin:", fpath)
return
}
debug := g.Config().Debug
if debug {
log.Println(fpath, "running...")
}
cmd := exec.Command(fpath) // EXEC执行插件命令创建
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err := cmd.Start() // CMD start
if err != nil {
log.Printf("[ERROR] plugin start fail, error: %s\n", err)
return
}
if debug {
log.Println("plugin started:", fpath)
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond) // CMD Run
errStr := stderr.String()
if errStr != "" {
logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log")
if _, err = file.WriteString(logFile, errStr); err != nil {
log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err)
}
}
if isTimeout {
// has be killed 超时处理
if err == nil && debug {
log.Println("[INFO] timeout and kill process", fpath, "successfully")
}
if err != nil {
log.Println("[ERROR] kill process", fpath, "occur error:", err)
}
return
}
if err != nil {
log.Println("[ERROR] exec plugin", fpath, "fail. error:", err)
return
}
// exec successfully
data := stdout.Bytes()
if len(data) == 0 {
if debug {
log.Println("[DEBUG] stdout of", fpath, "is blank")
}
return
}
var metrics []*model.MetricValue
err = json.Unmarshal(data, &metrics) //序列化数据
if err != nil {
log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String())
return
}
g.SendToTransfer(metrics) //「 上报数据 」
}
g.SendToTransfer() 向transfer上报数据
#g->var向transfer服务器上报数据
func SendToTransfer(metrics []*model.MetricValue) {
if len(metrics) == 0 {
return
}
dt := Config().DefaultTags
if len(dt) > 0 {
var buf bytes.Buffer
default_tags_list := []string{}
for k, v := range dt {
buf.Reset()
buf.WriteString(k)
buf.WriteString("=")
buf.WriteString(v)
default_tags_list = append(default_tags_list, buf.String())
}
default_tags := strings.Join(default_tags_list, ",")
for i, x := range metrics {
buf.Reset()
if x.Tags == "" {
metrics[i].Tags = default_tags
} else {
buf.WriteString(metrics[i].Tags)
buf.WriteString(",")
buf.WriteString(default_tags)
metrics[i].Tags = buf.String()
}
}
}
debug := Config().Debug
if debug {
log.Printf("=> <Total=%d> %v\n", len(metrics), metrics[0])
}
var resp model.TransferResponse
SendMetrics(metrics, &resp) //「send」
if debug {
log.Println("<=", &resp)
}
}
# g->transfer向transfer服务器发送采集数据
func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) {
rand.Seed(time.Now().UnixNano())
for _, i := range rand.Perm(len(Config().Transfer.Addrs)) {
// 随机获取一台Transfer地址
addr := Config().Transfer.Addrs[i]
//「获取Transfer RPC Client」,如果获取为失败,「初始化Transfer RPC Client」
c := getTransferClient(addr)
if c == nil {
c = initTransferClient(addr)
}
//「上报数据」
if updateMetrics(c, metrics, resp) {
break
}
}
}
## 获取可用的TransferClient对象
func getTransferClient(addr string) *SingleConnRpcClient {
TransferClientsLock.RLock()
defer TransferClientsLock.RUnlock()
if c, ok := TransferClients[addr]; ok {
return c
}
return nil
}
## 构建一个新的TransferClient对象
func initTransferClient(addr string) *SingleConnRpcClient {
var c *SingleConnRpcClient = &SingleConnRpcClient{
RpcServer: addr,
Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,
}
TransferClientsLock.Lock()
defer TransferClientsLock.Unlock()
TransferClients[addr] = c
return c
}
## RPC向Transfer服务器上报metrics数据,RPC类型"Transfer.Update"
func updateMetrics(c *SingleConnRpcClient, metrics []*model.MetricValue, resp *model.TransferResponse) bool {
err := c.Call("Transfer.Update", metrics, resp)
if err != nil {
log.Println("call Transfer.Update fail:", c, err)
return false
}
return true
}
cron.SyncBuiltinMetrics() 向HBS服务器同步监控端口、du路径、进程和URL
func SyncBuiltinMetrics() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncBuiltinMetrics() //后台线程同步
}
}
# 向HBS同步
func syncBuiltinMetrics() {
var timestamp int64 = -1
var checksum string = "nil"
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ports = []int64{}
var paths = []string{}
var procs = make(map[string]map[int]string)
var urls = make(map[string]string)
hostname, err := g.Hostname()
if err != nil {
continue
}
req := model.AgentHeartbeatRequest{
Hostname: hostname,
Checksum: checksum,
}
var resp model.BuiltinMetricResponse
// RPC调用类型"Agent.BuiltinMetrics"
err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp)
if err != nil {
log.Println("ERROR:", err)
continue
}
if resp.Timestamp <= timestamp {
continue
}
if resp.Checksum == checksum {
continue
}
timestamp = resp.Timestamp
checksum = resp.Checksum
for _, metric := range resp.Metrics {
//健康检测的URL列表
if metric.Metric == g.URL_CHECK_HEALTH {
arr := strings.Split(metric.Tags, ",")
if len(arr) != 2 {
continue
}
url := strings.Split(arr[0], "=")
if len(url) != 2 {
continue
}
stime := strings.Split(arr[1], "=")
if len(stime) != 2 {
continue
}
if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil {
urls[url[1]] = stime[1]
} else {
log.Println("metric ParseInt timeout failed:", err)
}
}
//监控PORT端口列表
if metric.Metric == g.NET_PORT_LISTEN {
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil {
ports = append(ports, port)
} else {
log.Println("metrics ParseInt failed:", err)
}
continue
}
// du路径列表
if metric.Metric == g.DU_BS {
arr := strings.Split(metric.Tags, "=")
if len(arr) != 2 {
continue
}
paths = append(paths, strings.TrimSpace(arr[1]))
continue
}
// 进程列表
if metric.Metric == g.PROC_NUM {
arr := strings.Split(metric.Tags, ",")
tmpMap := make(map[int]string)
for i := 0; i < len(arr); i++ {
if strings.HasPrefix(arr[i], "name=") {
tmpMap[1] = strings.TrimSpace(arr[i][5:])
} else if strings.HasPrefix(arr[i], "cmdline=") {
tmpMap[2] = strings.TrimSpace(arr[i][8:])
}
}
procs[metric.Tags] = tmpMap
}
}
// 设置同步数据
g.SetReportUrls(urls)
g.SetReportPorts(ports)
g.SetReportProcs(procs)
g.SetDuPaths(paths)
}
}
cron.SyncTrustableIps() 向HBS同步执行脚本的信任IP列表
func SyncTrustableIps() {
if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" {
go syncTrustableIps() //后台线程同步信任IP列表
}
}
func syncTrustableIps() {
duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second
for {
time.Sleep(duration)
var ips string
// RPC调用类型"Agent.TrustableIps"
err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips)
if err != nil {
log.Println("ERROR: call Agent.TrustableIps fail", err)
continue
}
// 设置列表
g.SetTrustableIps(ips)
}
}
cron.Collect() 周期性采集数据
func Collect() {
if !g.Config().Transfer.Enabled {
return
}
if len(g.Config().Transfer.Addrs) == 0 {
return
}
for _, v := range funcs.Mappers {
go collect(int64(v.Interval), v.Fs) //后台线程采集与上报
}
}
## 采集与向transfer上报数据 【`关键执行收集与上报Codes`】
func collect(sec int64, fns []func() []*model.MetricValue) {
t := time.NewTicker(time.Second * time.Duration(sec))
defer t.Stop()
for {
<-t.C
hostname, err := g.Hostname()
if err != nil {
continue
}
mvs := []*model.MetricValue{}
ignoreMetrics := g.Config().IgnoreMetrics
// 迭代所有采集项,执行采集Funcation,汇总Metrics
for _, fn := range fns {
items := fn() //调用采集模块funcation
if items == nil {
continue
}
if len(items) == 0 {
continue
}
for _, mv := range items {
if b, ok := ignoreMetrics[mv.Metric]; ok && b {
continue
} else {
mvs = append(mvs, mv) //汇总
}
}
}
now := time.Now().Unix()
//标示Agent信息和时间戳
for j := 0; j < len(mvs); j++ {
mvs[j].Step = sec
mvs[j].Endpoint = hostname
mvs[j].Timestamp = now
}
g.SendToTransfer(mvs) //向transfer上报所收集数据
}
}
http.Start() API与Dashboard服务运行
func init() {
configAdminRoutes() //管理接口路由
configCpuRoutes() //CPU信息路由
configDfRoutes() //DF挂载点信息路由
configHealthRoutes() //Health路由
configIoStatRoutes() //IoStat路由
configKernelRoutes() //Kernel路由
configMemoryRoutes() //Memory路由
configPageRoutes() //Dashboard路由
configPluginRoutes() //插件路由
configPushRoutes() //Push路由
configRunRoutes() //脚本执行路由
configSystemRoutes() //System路由
}
func Start() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen
if addr == "" {
return
}
//
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("listening", addr)
log.Fatalln(s.ListenAndServe())
}
# 配置http路由(Admin管理)
func configAdminRoutes() {
http.HandleFunc("/exit", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) {
w.Write([]byte("exiting..."))
go func() {
time.Sleep(time.Second)
os.Exit(0)
}()
} else {
w.Write([]byte("no privilege"))
}
})
http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {
if g.IsTrustable(r.RemoteAddr) {
g.ParseConfig(g.ConfigFile)
RenderDataJson(w, g.Config())
} else {
w.Write([]byte("no privilege"))
}
})
http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, file.SelfDir())
})
http.HandleFunc("/ips", func(w http.ResponseWriter, r *http.Request) {
RenderDataJson(w, g.TrustableIps())
})
}
# 配置http路由(Dashboard静态页)
func configPageRoutes() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if strings.HasSuffix(r.URL.Path, "/") {
if !file.IsExist(filepath.Join(g.Root, "/public", r.URL.Path, "index.html")) {
http.NotFound(w, r)
return
}
}
http.FileServer(http.Dir(filepath.Join(g.Root, "/public"))).ServeHTTP(w, r)
})
}
三 默认采集模块分析
- AgentMetrics -- agent alive状态采集
func AgentMetrics() []*model.MetricValue {
return []*model.MetricValue{GaugeValue("agent.alive", 1)}
}
- CpuMetrics -- CPU相关信息采集
# 在main入口后台线程已周期性收集CPU状态,保存在私有procStatHistory内
# go cron.InitDataHistory()-> UpdateCpuStat()
# 解析procStatHistory历史数据
# 比如CpuIdle值获取
func CpuIdle() float64 {
psLock.RLock()
defer psLock.RUnlock()
dt := deltaTotal()
if dt == 0 {
return 0.0
}
invQuotient := 100.00 / float64(dt)
return float64(procStatHistory[0].Cpu.Idle-procStatHistory[1].Cpu.Idle) * invQuotient //解析cpuidle
}
func CpuMetrics() []*model.MetricValue {
if !CpuPrepared() {
return []*model.MetricValue{}
}
# CPU信息: idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches
cpuIdleVal := CpuIdle()
idle := GaugeValue("cpu.idle", cpuIdleVal) //空闲值
busy := GaugeValue("cpu.busy", 100.0-cpuIdleVal) //繁忙值
user := GaugeValue("cpu.user", CpuUser()) // 用户使用CPU
nice := GaugeValue("cpu.nice", CpuNice()) //CPU NICE值
system := GaugeValue("cpu.system", CpuSystem()) //系统使用CPU
iowait := GaugeValue("cpu.iowait", CpuIowait()) //IO等待
irq := GaugeValue("cpu.irq", CpuIrq()) //IO中断请求
softirq := GaugeValue("cpu.softirq", CpuSoftIrq()) //软中断请求
steal := GaugeValue("cpu.steal", CpuSteal()) //Steal值
guest := GaugeValue("cpu.guest", CpuGuest()) //Guest值
switches := CounterValue("cpu.switches", CurrentCpuSwitches()) //交换值
return []*model.MetricValue{idle, busy, user, nice, system, iowait, irq, softirq, steal, guest, switches}
}
- NetMetrics 网卡相关信息采集
# 依据配置网卡接口信息采集数据
func NetMetrics() []*model.MetricValue {
return CoreNetMetrics(g.Config().Collector.IfacePrefix)
}
#引用"github.com/toolkits/nux"采集网卡信息
#实现读取与解析"/proc/net/dev"信息
func CoreNetMetrics(ifacePrefix []string) []*model.MetricValue {
netIfs, err := nux.NetIfs(ifacePrefix)
if err != nil {
log.Println(err)
return []*model.MetricValue{}
}
cnt := len(netIfs)
ret := make([]*model.MetricValue, cnt*26)
for idx, netIf := range netIfs {
iface := "iface=" + netIf.Iface
ret[idx*26+0] = CounterValue("net.if.in.bytes", netIf.InBytes, iface) //入字节数
ret[idx*26+1] = CounterValue("net.if.in.packets", netIf.InPackages, iface) //入包数
ret[idx*26+2] = CounterValue("net.if.in.errors", netIf.InErrors, iface) //入错误包数
ret[idx*26+3] = CounterValue("net.if.in.dropped", netIf.InDropped, iface) //入丢包数
ret[idx*26+4] = CounterValue("net.if.in.fifo.errs", netIf.InFifoErrs, iface) //FIFO ERROR
ret[idx*26+5] = CounterValue("net.if.in.frame.errs", netIf.InFrameErrs, iface) //帧ERROR
ret[idx*26+6] = CounterValue("net.if.in.compressed", netIf.InCompressed, iface) //压缩
ret[idx*26+7] = CounterValue("net.if.in.multicast", netIf.InMulticast, iface) //多播
ret[idx*26+8] = CounterValue("net.if.out.bytes", netIf.OutBytes, iface) //出字节数
ret[idx*26+9] = CounterValue("net.if.out.packets", netIf.OutPackages, iface) //出包数
ret[idx*26+10] = CounterValue("net.if.out.errors", netIf.OutErrors, iface) //出错误数
ret[idx*26+11] = CounterValue("net.if.out.dropped", netIf.OutDropped, iface) // 出丢包数
ret[idx*26+12] = CounterValue("net.if.out.fifo.errs", netIf.OutFifoErrs, iface) //出FIFO ERROR
ret[idx*26+13] = CounterValue("net.if.out.collisions", netIf.OutCollisions, iface) //出冲突
ret[idx*26+14] = CounterValue("net.if.out.carrier.errs", netIf.OutCarrierErrs, iface) //载波
ret[idx*26+15] = CounterValue("net.if.out.compressed", netIf.OutCompressed, iface) //压缩
ret[idx*26+16] = CounterValue("net.if.total.bytes", netIf.TotalBytes, iface) //总字节
ret[idx*26+17] = CounterValue("net.if.total.packets", netIf.TotalPackages, iface) //总包数
ret[idx*26+18] = CounterValue("net.if.total.errors", netIf.TotalErrors, iface) //总错误
ret[idx*26+19] = CounterValue("net.if.total.dropped", netIf.TotalDropped, iface) //总丢包数
ret[idx*26+20] = GaugeValue("net.if.speed.bits", netIf.SpeedBits, iface) //速率bs
ret[idx*26+21] = CounterValue("net.if.in.percent", netIf.InPercent, iface) //进百分比
ret[idx*26+22] = CounterValue("net.if.out.percent", netIf.OutPercent, iface) //出百分比
ret[idx*26+23] = CounterValue("net.if.in.bits", netIf.InBytes*8, iface) //进速率bs
ret[idx*26+24] = CounterValue("net.if.out.bits", netIf.OutBytes*8, iface) //出速率bs
ret[idx*26+25] = CounterValue("net.if.total.bits", netIf.TotalBytes*8, iface) //总速率bs
}
return ret
}
- KernelMetrics 系统内核相关信息采集
#引用"github.com/toolkits/nux"采集内核信息
#实现读取"/proc/sys/fs/file-max"
# "/proc/sys/fs/file-nr"
# "/proc/sys/kernel/pid_max"
func KernelMetrics() (L []*model.MetricValue) {
maxFiles, err := nux.KernelMaxFiles()
if err != nil {
log.Println(err)
return
}
//最大打开文件数
L = append(L, GaugeValue("kernel.maxfiles", maxFiles))
maxProc, err := nux.KernelMaxProc()
if err != nil {
log.Println(err)
return
}
//最大进程数
L = append(L, GaugeValue("kernel.maxproc", maxProc))
allocateFiles, err := nux.KernelAllocateFiles()
if err != nil {
log.Println(err)
return
}
//已分配文件数
L = append(L, GaugeValue("kernel.files.allocated", allocateFiles))
//已剩余文件数
L = append(L, GaugeValue("kernel.files.left", maxFiles-allocateFiles))
return
}
- LoadAvgMetrics -- CPU负载信息采集
#引用"github.com/toolkits/nux"采集CPU负载信息
#实现读取与解析 "/proc/loadavg"信息
#
func LoadAvgMetrics() []*model.MetricValue {
load, err := nux.LoadAvg()
if err != nil {
log.Println(err)
return nil
}
return []*model.MetricValue{
GaugeValue("load.1min", load.Avg1min), //1分钟平均
GaugeValue("load.5min", load.Avg5min), //5分钟平均
GaugeValue("load.15min", load.Avg15min), //15分钟平均
}
}
- MemMetrics -- 内存信息采集
#引用"github.com/toolkits/nux"采集内存信息
#实现读取与解析"/proc/meminfo"信息
func MemMetrics() []*model.MetricValue {
m, err := nux.MemInfo()
if err != nil {
log.Println(err)
return nil
}
memFree := m.MemFree + m.Buffers + m.Cached
memUsed := m.MemTotal - memFree
pmemFree := 0.0
pmemUsed := 0.0
if m.MemTotal != 0 {
pmemFree = float64(memFree) * 100.0 / float64(m.MemTotal)
pmemUsed = float64(memUsed) * 100.0 / float64(m.MemTotal)
}
pswapFree := 0.0
pswapUsed := 0.0
if m.SwapTotal != 0 {
pswapFree = float64(m.SwapFree) * 100.0 / float64(m.SwapTotal)
pswapUsed = float64(m.SwapUsed) * 100.0 / float64(m.SwapTotal)
}
return []*model.MetricValue{
GaugeValue("mem.memtotal", m.MemTotal), //总内存大小
GaugeValue("mem.memused", memUsed), //已使用内存大小
GaugeValue("mem.memfree", memFree), //可用内存大小
GaugeValue("mem.swaptotal", m.SwapTotal), //交换内存总大小
GaugeValue("mem.swapused", m.SwapUsed), //已使用交换内存大小
GaugeValue("mem.swapfree", m.SwapFree), //可用交换内存大小
GaugeValue("mem.memfree.percent", pmemFree), //可用内存占比
GaugeValue("mem.memused.percent", pmemUsed), //已用内存占比
GaugeValue("mem.swapfree.percent", pswapFree), //可用交换内存占比
GaugeValue("mem.swapused.percent", pswapUsed), //已用交换同内存占比
}
}
- DiskIOMetrics -- 磁盘IO信息采集
#引用"github.com/toolkits/nux"采集DISK信息
#实现读取与解析"/proc/diskstats"信息
func DiskIOMetrics() (L []*model.MetricValue) {
dsList, err := nux.ListDiskStats()
if err != nil {
log.Println(err)
return
}
//迭代DISK列表
for _, ds := range dsList {
if !ShouldHandleDevice(ds.Device) {
continue
}
device := "device=" + ds.Device
L = append(L, CounterValue("disk.io.read_requests", ds.ReadRequests, device))//读请求
L = append(L, CounterValue("disk.io.read_merged", ds.ReadMerged, device)) //相邻的读取请求合并
L = append(L, CounterValue("disk.io.read_sectors", ds.ReadSectors, device)) //读扇区
L = append(L, CounterValue("disk.io.msec_read", ds.MsecRead, device)) //disk读花费的时间
L = append(L, CounterValue("disk.io.write_requests", ds.WriteRequests, device)) //写请求
L = append(L, CounterValue("disk.io.write_merged", ds.WriteMerged, device)) //相邻的写请求合并
L = append(L, CounterValue("disk.io.write_sectors", ds.WriteSectors, device)) //写扇区
L = append(L, CounterValue("disk.io.msec_write", ds.MsecWrite, device)) //disk写花费的时间
L = append(L, CounterValue("disk.io.ios_in_progress", ds.IosInProgress, device)) //正处理的IO数
L = append(L, CounterValue("disk.io.msec_total", ds.MsecTotal, device)) //总花费的时间
L = append(L, CounterValue("disk.io.msec_weighted_total", ds.MsecWeightedTotal, device)) //统计最近的IO完成时间和积压
}
return
}
- IOStatsMetrics -- 磁盘IO信息采集
# 在main入口后台线程已周期性收集CPU状态,保存在私有procStatHistory内
# go cron.InitDataHistory()-> UpdateDiskStats()
# 私有变量diskStatsMap保存周期性数据
# 解析 diskStatsMap 获取IO状态数据
func IOStatsMetrics() (L []*model.MetricValue) {
dsLock.RLock()
defer dsLock.RUnlock()
for device := range diskStatsMap {
if !ShouldHandleDevice(device) {
continue
}
tags := "device=" + device
rio := IODelta(device, IOReadRequests)
wio := IODelta(device, IOWriteRequests)
delta_rsec := IODelta(device, IOReadSectors)
delta_wsec := IODelta(device, IOWriteSectors)
ruse := IODelta(device, IOMsecRead)
wuse := IODelta(device, IOMsecWrite)
use := IODelta(device, IOMsecTotal)
n_io := rio + wio
avgrq_sz := 0.0
await := 0.0
svctm := 0.0
if n_io != 0 {
avgrq_sz = float64(delta_rsec+delta_wsec) / float64(n_io)
await = float64(ruse+wuse) / float64(n_io)
svctm = float64(use) / float64(n_io)
}
duration := IODelta(device, TS)
L = append(L, GaugeValue("disk.io.read_bytes", float64(delta_rsec)*512.0, tags)) // 读字节数
L = append(L, GaugeValue("disk.io.write_bytes", float64(delta_wsec)*512.0, tags)) // 写字节数
L = append(L, GaugeValue("disk.io.avgrq_sz", avgrq_sz, tags)) //平均请求扇区的大小
L = append(L, GaugeValue("disk.io.avgqu-sz", float64(IODelta(device, IOMsecWeightedTotal))/1000.0, tags)) //是平均请求队列的长度
L = append(L, GaugeValue("disk.io.await", await, tags)) //每一个IO请求的处理的平均时间
L = append(L, GaugeValue("disk.io.svctm", svctm, tags))//表示平均每次设备I/O操作的服务时间
tmp := float64(use) * 100.0 / float64(duration)
if tmp > 100.0 {
tmp = 100.0
}
L = append(L, GaugeValue("disk.io.util", tmp, tags)) //在统计时间内所有处理IO时间,除以总共统计时间
}
return
}
- NetstatMetrics -- TCP信息采集
#引用"github.com/toolkits/nux"采集TCP信息
#实现为读"/proc/net/netstat"信息并解析
func NetstatMetrics() (L []*model.MetricValue) {
tcpExts, err := nux.Netstat("TcpExt")
if err != nil {
log.Println(err)
return
}
cnt := len(tcpExts)
if cnt == 0 {
return
}
for key, val := range tcpExts {
if _, ok := USES[key]; !ok {
continue
}
L = append(L, CounterValue("TcpExt."+key, val))
}
return
}
- ProcMetrics -- 采集Proc相关信息
# main入口syncBuiltinMetrics()从HBS同步SetReportProcs()
# g.ReportProcs()获取Procs列表map
# 获取"/proc"目录下"/proc/%pid/cmdline"
func ProcMetrics() (L []*model.MetricValue) {
reportProcs := g.ReportProcs()
sz := len(reportProcs)
if sz == 0 {
return
}
ps, err := nux.AllProcs()
if err != nil {
log.Println(err)
return
}
pslen := len(ps)
for tags, m := range reportProcs {
cnt := 0
for i := 0; i < pslen; i++ {
if is_a(ps[i], m) {
cnt++
}
}
L = append(L, GaugeValue(g.PROC_NUM, cnt, tags))
}
return
}
- UdpMetrics UDP接收和发送数据报采集
# 实现获取"/proc/net/snmp"信息UDP接收和发送数据报
func UdpMetrics() []*model.MetricValue {
udp, err := nux.Snmp("Udp")
if err != nil {
log.Println("read snmp fail", err)
return []*model.MetricValue{}
}
count := len(udp)
ret := make([]*model.MetricValue, count)
i := 0
for key, val := range udp {
ret[i] = CounterValue("snmp.Udp."+key, val)
i++
}
return ret
}
- DeviceMetrics -- 挂载设备信息采集
# 实现获取与解析"/proc/mounts"信息
func DeviceMetrics() (L []*model.MetricValue) {
mountPoints, err := nux.ListMountPoint()
if err != nil {
log.Error("collect device metrics fail:", err)
return
}
var myMountPoints map[string]bool = make(map[string]bool)
if len(g.Config().Collector.MountPoint) > 0 {
for _, mp := range g.Config().Collector.MountPoint {
myMountPoints[mp] = true
}
}
var diskTotal uint64 = 0
var diskUsed uint64 = 0
for idx := range mountPoints {
fsSpec, fsFile, fsVfstype := mountPoints[idx][0], mountPoints[idx][1], mountPoints[idx][2]
if len(myMountPoints) > 0 {
if _, ok := myMountPoints[fsFile]; !ok {
log.Debug("mount point not matched with config", fsFile, "ignored.")
continue
}
}
var du *nux.DeviceUsage
du, err = nux.BuildDeviceUsage(fsSpec, fsFile, fsVfstype)
if err != nil {
log.Error(err)
continue
}
if du.BlocksAll == 0 {
continue
}
diskTotal += du.BlocksAll
diskUsed += du.BlocksUsed
tags := fmt.Sprintf("mount=%s,fstype=%s", du.FsFile, du.FsVfstype)
L = append(L, GaugeValue("df.bytes.total", du.BlocksAll, tags)) //总大小字节
L = append(L, GaugeValue("df.bytes.used", du.BlocksUsed, tags)) //已使用小大字节
L = append(L, GaugeValue("df.bytes.free", du.BlocksFree, tags)) //剩余小大字节
L = append(L, GaugeValue("df.bytes.used.percent", du.BlocksUsedPercent, tags)) //使用占比
L = append(L, GaugeValue("df.bytes.free.percent", du.BlocksFreePercent, tags)) //剩余块占比
if du.InodesAll == 0 {
continue
}
L = append(L, GaugeValue("df.inodes.total", du.InodesAll, tags)) //inode总数
L = append(L, GaugeValue("df.inodes.used", du.InodesUsed, tags)) //已用inode数目
L = append(L, GaugeValue("df.inodes.free", du.InodesFree, tags)) //可用inode数目
L = append(L, GaugeValue("df.inodes.used.percent", du.InodesUsedPercent, tags)) // 已用inode占比
L = append(L, GaugeValue("df.inodes.free.percent", du.InodesFreePercent, tags)) // 可用inode占比
}
if len(L) > 0 && diskTotal > 0 {
L = append(L, GaugeValue("df.statistics.total", float64(diskTotal))) //统计总数
L = append(L, GaugeValue("df.statistics.used", float64(diskUsed))) // 统计使用
L = append(L, GaugeValue("df.statistics.used.percent", float64(diskUsed)*100.0/float64(diskTotal))) //统计使用占比
}
return
}
- PortMetrics -- TCP / UDP 端口信息采集
# main入口syncBuiltinMetrics()-> g.SetReportPorts(ports)从HBS同步检测端口列表
# g.ReportPorts() 获取列表
# 实现获取TCP "ss -t -l -n" /获取UDP "ss -u -a -n"监听信息
func PortMetrics() (L []*model.MetricValue) {
reportPorts := g.ReportPorts()
sz := len(reportPorts)
if sz == 0 {
return
}
allTcpPorts, err := nux.TcpPorts()
if err != nil {
log.Println(err)
return
}
allUdpPorts, err := nux.UdpPorts()
if err != nil {
log.Println(err)
return
}
for i := 0; i < sz; i++ {
tags := fmt.Sprintf("port=%d", reportPorts[i])
if slice.ContainsInt64(allTcpPorts, reportPorts[i]) || slice.ContainsInt64(allUdpPorts, reportPorts[i]) {
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 1, tags))
} else {
L = append(L, GaugeValue(g.NET_PORT_LISTEN, 0, tags))
}
}
return
}
- SocketStatSummaryMetrics -- socket统计信息采集
# 实现获取socket统计信息 "ss -s"
func SocketStatSummaryMetrics() (L []*model.MetricValue) {
ssMap, err := nux.SocketStatSummary()
if err != nil {
log.Println(err)
return
}
for k, v := range ssMap {
L = append(L, GaugeValue("ss."+k, v))
}
return
}
- DuMetrics
# main入口syncBuiltinMetrics()从HBS同步g.SetDuPaths(paths)
# g.DuPaths()获取磁盘分区列表
func DuMetrics() (L []*model.MetricValue) {
paths := g.DuPaths()
result := make(chan *model.MetricValue, len(paths))
var wg sync.WaitGroup
//迭代所有PATH
for _, path := range paths {
wg.Add(1)
go func(filepath string) {
var err error
defer func() {
if err != nil {
log.Println(err)
result <- GaugeValue(g.DU_BS, -1, "path="+path)
}
wg.Done()
}()
//tips:osx does not support -b.
cmd := exec.Command("du", "-bs", path) //exec执行du
var stdout bytes.Buffer
cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr
err = cmd.Start()
if err != nil {
return
}
err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Second)
if isTimeout {
err = errors.New(fmt.Sprintf("exec cmd : du -bs %s timeout", path))
return
}
errStr := stderr.String()
if errStr != "" {
err = errors.New(errStr)
return
}
if err != nil {
err = errors.New(fmt.Sprintf("du -bs %s failed: %s", path, err.Error()))
return
}
arr := strings.Fields(stdout.String())
if len(arr) < 2 {
errors.New(fmt.Sprintf("du -bs %s failed: %s", path, "return fields < 2"))
return
}
size, err := strconv.ParseUint(arr[0], 10, 64)
if err != nil {
err = errors.New(fmt.Sprintf("cannot parse du -bs %s output", path))
return
}
result <- GaugeValue(g.DU_BS, size, "path="+path)
}(path)
}
wg.Wait()
resultLen := len(result)
for i := 0; i < resultLen; i++ {
L = append(L, <-result)
}
return
}
- UrlMetrics -- URL健康检测采集
func UrlMetrics() (L []*model.MetricValue) {
//获取周期性向HBS所同步的URL列表
reportUrls := g.ReportUrls()
sz := len(reportUrls)
if sz == 0 {
return
}
hostname, err := g.Hostname()
if err != nil {
hostname = "None"
}
// 迭代URL列表,执行URL状态检测
for furl, timeout := range reportUrls {
tags := fmt.Sprintf("url=%v,timeout=%v,src=%v", furl, timeout, hostname)
//「probeUrl检测」
if ok, _ := probeUrl(furl, timeout); !ok {
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 0, tags))
continue
}
L = append(L, GaugeValue(g.URL_CHECK_HEALTH, 1, tags))
}
return
}
//通过调用系统命令执行Curl命令进行Http_code的状态检测
func probeUrl(furl string, timeout string) (bool, error) {
bs, err := sys.CmdOutBytes("curl", "--max-filesize", "102400", "-I", "-m", timeout, "-o", "/dev/null", "-s", "-w", "%{http_code}", furl)
if err != nil {
log.Printf("probe url [%v] failed.the err is: [%v]\n", furl, err)
return false, err
}
reader := bufio.NewReader(bytes.NewBuffer(bs))
retcode, err := file.ReadLine(reader)
if err != nil {
log.Println("read retcode failed.err is:", err)
return false, err
}
if strings.TrimSpace(string(retcode)) != "200" {
log.Printf("return code [%v] is not 200.query url is [%v]", string(retcode), furl)
return false, err
}
return true, err
}
- GpuMetrics -- GPU信息采集
// 引用外部库"github.com/mindprince/gonvml"
// 需要load libnvidia-ml.so.1库
func GpuMetrics() (L []*model.MetricValue) {
if err := gonvml.Initialize(); err != nil {
debug := g.Config().Debug
if debug {
log.Println("Initialize error: ", err)
}
return
}
defer gonvml.Shutdown()
count, err := gonvml.DeviceCount()
if err != nil {
log.Println("DeviceCount error: ", err)
return
}
if count == 0 {
return
}
temperature := uint(0)
totalMemory := uint64(0)
usedMemory := uint64(0)
gpuUtilization := uint(0)
memoryUtilization := uint(0)
powerUsage := uint(0)
allUtilization := uint(0)
allMemoryUtilization := uint(0)
for i := 0; i < int(count); i++ {
dev, err := gonvml.DeviceHandleByIndex(uint(i))
if err != nil {
log.Println("DeviceHandleByIndex error:", err)
continue
}
uuid, err := dev.UUID()
if err != nil {
log.Println("dev.UUID error", err)
}
tag := "uuid=" + uuid
// 不是所有gpu都有风扇
fanSpeed, err := dev.FanSpeed()
if err != nil {
log.Println("dev.FanSpeed error: ", err)
} else {
L = append(L, GaugeValue("gpu.fan.speed", fanSpeed, tag))
}
temperature, err = dev.Temperature()
if err != nil {
log.Println("dev.Temperature error: ", err)
continue
}
totalMemory, usedMemory, err = dev.MemoryInfo()
if err != nil {
log.Println("dev.MemoryInfo error: ", err)
continue
}
// 单位换算为兆
totalBillion := float64(totalMemory / 1024 / 1024)
usedBillion := float64(usedMemory / 1024 / 1024)
gpuUtilization, memoryUtilization, err = dev.UtilizationRates()
if err != nil {
log.Println("dev.UtilizationRates error: ", err)
continue
}
allUtilization += gpuUtilization
allMemoryUtilization += memoryUtilization
powerUsage, err = dev.PowerUsage()
if err != nil {
log.Println("dev.PowerUsage error: ", err)
}
// 单位换算为瓦特
powerWatt := float64(powerUsage / 1000)
L = append(L, GaugeValue("gpu.temperature", temperature, tag))
L = append(L, GaugeValue("gpu.memory.total", totalBillion, tag))
L = append(L, GaugeValue("gpu.memory.used", usedBillion, tag))
L = append(L, GaugeValue("gpu.memory.util", memoryUtilization, tag))
L = append(L, GaugeValue("gpu.util", gpuUtilization, tag))
L = append(L, GaugeValue("gpu.power.usage", powerWatt, tag))
}
L = append(L, GaugeValue("gpu.count", count))
L = append(L, GaugeValue("gpu.util.avg", allUtilization/count))
L = append(L, GaugeValue("gpu.memory.util.avg", allMemoryUtilization/count))
return L
}
四 思考与查证
- 请查看源码,查证Agent所关联的组件HBS/Transfer哪个是集群模式?哪个是单点模式?为什么?
- 如果需要自定义Agent的采集数据有哪几种方式?从源码角度又应该如何添加相应的采集Code?
- 聊一聊从源码角度Agent组件哪些代码技术点或模式点值得你好好学习与借鉴的?
网友评论