美文网首页
OpenFalcon源码分析(Transfer组件)

OpenFalcon源码分析(Transfer组件)

作者: Xiao_Yang | 来源:发表于2018-09-17 08:48 被阅读0次

Transfer版本

VERSION = "0.0.17"
COMMIT = "e249d8a"

Transfer组件功能

接收RPC/TCP服务发送过来的采集数据归整、入队列、统计、转发数据至后端Graph、Judge或TSDB组件。

Transfer组件逻辑图

Transfer逻辑图

main入口分析

func main() {

    //命令行参数解析
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    versionGit := flag.Bool("vg", false, "show version")
    flag.Parse()
   
    //程序版本信息输出
    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }
    
    //git版本信息输出
    if *versionGit {
        fmt.Println(g.VERSION, g.COMMIT)
        os.Exit(0)
    }

    //全局配置解析
    g.ParseConfig(*cfg)     //【参考详细分析】
    
    // proc为统计模块,此处仅打印"proc.Start, ok"
    // sender/receiver模块都引用统计功能
    proc.Start()

    //发送与接收监控数据
    sender.Start()         //主要业务逻辑模块【参考详细分析】
    receiver.Start()       //主要业务逻辑模块【参考详细分析】

    //http API服务启动与监听处理 
    http.Start()            //【参考详细分析】

    select {}
}

g.ParseConfig(*cfg) 全局配置解析

# 默认配置为当前目录下cfg.json 
func ParseConfig(cfg string) {
    if cfg == "" {
        log.Fatalln("use -c to specify configuration file")
    }
    //判断文件是否存在
    if !file.IsExist(cfg) {
        log.Fatalln("config file:", cfg, "is not existent. maybe you need `mv cfg.example.json cfg.json`")
    }

    ConfigFile = cfg
    //读取配置成字符串
    configContent, err := file.ToTrimString(cfg)
    if err != nil {
        log.Fatalln("read config file:", cfg, "fail:", err)
    }

    var c GlobalConfig
    //json反序列化为配置结构体
    err = json.Unmarshal([]byte(configContent), &c)
    if err != nil {
        log.Fatalln("parse config file:", cfg, "fail:", err)
    }

    // 主机信息字符串转为slice
    c.Judge.ClusterList = formatClusterItems(c.Judge.Cluster)
    c.Graph.ClusterList = formatClusterItems(c.Graph.Cluster)

    configLock.Lock()
    defer configLock.Unlock()
    config = &c

    log.Println("g.ParseConfig ok, file ", cfg)
}

# 如map["node"]="host1,host2"-->map["node"]=["host1", "host2"]
# 将主机信息字符串以","作为分隔符截为slice
func formatClusterItems(cluster map[string]string) map[string]*ClusterNode {
    ret := make(map[string]*ClusterNode)
    for node, clusterStr := range cluster {
        items := strings.Split(clusterStr, ",") //截断
        nitems := make([]string, 0)
        for _, item := range items {
            nitems = append(nitems, strings.TrimSpace(item)) //去空格
        }
        ret[node] = NewClusterNode(nitems) //&ClusterNode{nitems}
    }
    return ret
}

# 全局配置结构体
type GlobalConfig struct {
    Debug   bool          `json:"debug"`
    MinStep int           `json:"minStep"` //最小周期,单位sec
    Http    *HttpConfig   `json:"http"`
    Rpc     *RpcConfig    `json:"rpc"`
    Socket  *SocketConfig `json:"socket"`
    Judge   *JudgeConfig  `json:"judge"`
    Graph   *GraphConfig  `json:"graph"`
    Tsdb    *TsdbConfig   `json:"tsdb"`
}

http.Start() HTTP API服务启动与监听处理

# 后台线程执行
func Start() {
    go startHttpServer()
}

func startHttpServer() {
    if !g.Config().Http.Enabled {
        return
    }

    addr := g.Config().Http.Listen
    if addr == "" {
        return
    }

    configCommonRoutes()      //公共API接口路由
    configProcHttpRoutes()    //统计API接口路由
    configDebugHttpRoutes()   //Debug接口路由

    configApiRoutes()         //"/api/push"配置上传接口路由

    s := &http.Server{
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,
    }

    log.Println("http.startHttpServer ok, listening", addr)
    log.Fatalln(s.ListenAndServe())   //Run
}

# 公共API接口路由
func configCommonRoutes() {
    //健康检测
    http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {})
    //查询版本信息
    http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {})
    //工作目录查询
    http.HandleFunc("/workdir", func(w http.ResponseWriter, r *http.Request) {})
    //全局配置项信息
    http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {})
    //重载配置信息
    http.HandleFunc("/config/reload", func(w http.ResponseWriter, r *http.Request) {}

# 传递新metric值
func configApiRoutes() {
    http.HandleFunc("/api/push", api_push_datapoints)
}

# 统计API接口
func configProcHttpRoutes() {
    //查看所有统计信息
    http.HandleFunc("/counter/all", func(w http.ResponseWriter, r *http.Request) {})

    //旧统计接口已弃用
    http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) {})

    //查看最小上报周期
    http.HandleFunc("/proc/step", func(w http.ResponseWriter, r *http.Request) {})

    //trace信息"RecvDataTrace"    
    http.HandleFunc("/trace/", func(w http.ResponseWriter, r *http.Request) {})

    // filter信息"RecvDataFilter"
    http.HandleFunc("/filter/", func(w http.ResponseWriter, r *http.Request) {}

#Debug接口路由
func configDebugHttpRoutes() {
    //debug查看连接池信息 ("judge" OR "graph")
    http.HandleFunc("/debug/connpool/", func(w http.ResponseWriter, r *http.Request) {}

sender.Start() 初始化数据发送服务

# 初始化数据发送服务
func Start() {
    // 初始化默认参数
    MinStep = g.Config().MinStep
    if MinStep < 1 {
        MinStep = 30 //默认30s
    }
    initConnPools()      //初始化连接池    
    initSendQueues()     //初始化队列(双链safeList)
    initNodeRings()      //初始化分布式HASH表(后端集群分布数据)
    
    startSendTasks()     // SendTasks依赖基础组件的初始化,发送数据至后端
    startSenderCron()    // 定期收集与打印统计数据
    log.Println("send.Start, ok")
}

  • initConnPools() 初始化后端连接池
##  初始化后端连接池
func initConnPools() {
    cfg := g.Config()
    
    // 实例化judgeInstances数据集,judgeInstances.ToSlice()转化Slice
    // judge配置,根据全局集群配置创建连接池
    judgeInstances := nset.NewStringSet()
    for _, instance := range cfg.Judge.Cluster {
        judgeInstances.Add(instance)
    }
    JudgeConnPools = backend.CreateSafeRpcConnPools(cfg.Judge.MaxConns, cfg.Judge.MaxIdle,
        cfg.Judge.ConnTimeout, cfg.Judge.CallTimeout, judgeInstances.ToSlice())

    // tsdb配置,根据全局集群配置创建连接池
    if cfg.Tsdb.Enabled {
        TsdbConnPoolHelper = backend.NewTsdbConnPoolHelper(cfg.Tsdb.Address, cfg.Tsdb.MaxConns, cfg.Tsdb.MaxIdle, cfg.Tsdb.ConnTimeout, cfg.Tsdb.CallTimeout)
    }
    
    // graph配置,根据全局集群配置创建连接池
    graphInstances := nset.NewSafeSet()
    for _, nitem := range cfg.Graph.ClusterList {
        for _, addr := range nitem.Addrs {
            graphInstances.Add(addr)
        }
    }
    GraphConnPools = backend.CreateSafeRpcConnPools(cfg.Graph.MaxConns, cfg.Graph.MaxIdle,
        cfg.Graph.ConnTimeout, cfg.Graph.CallTimeout, graphInstances.ToSlice())

}
  • initSendQueues() 初始化各后端服务器及集群的发送队列

## 初始化各后端服务器及集群的发送队列
func initSendQueues() {
    cfg := g.Config()
    //根据全局配置Judge集群设置创建相对应个数的队列
    for node := range cfg.Judge.Cluster {
        Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
        JudgeQueues[node] = Q
    }
    
    //根据全局配置Graph集群设置创建相对应个数的队列
    for node, nitem := range cfg.Graph.ClusterList {
        for _, addr := range nitem.Addrs {
            Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
            GraphQueues[node+addr] = Q
        }
    }
    
    //根据全局配置tsdb是否开启,创建队列
    if cfg.Tsdb.Enabled {
        TsdbQueue = nlist.NewSafeListLimited(DefaultSendQueueMaxSize)
    }
}
  • initNodeRings() 初始化一致性Hash环
## 初始化一致性Hashing环(JudgeNodeRing,GraphNodeRing)
## Data->Node映射
## 了解基本知识"一致性Hashing",
func initNodeRings() {
    cfg := g.Config()

    JudgeNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Judge.Replicas), cutils.KeysOfMap(cfg.Judge.Cluster))
    GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster))
}

  • startSendTasks() 发送数据与控制
func startSendTasks() {
    cfg := g.Config()
    //init semaphore
    judgeConcurrent := cfg.Judge.MaxConns
    graphConcurrent := cfg.Graph.MaxConns
    tsdbConcurrent := cfg.Tsdb.MaxConns

    if tsdbConcurrent < 1 {
        tsdbConcurrent = 1
    }

    if judgeConcurrent < 1 {
        judgeConcurrent = 1
    }

    if graphConcurrent < 1 {
        graphConcurrent = 1
    }

    //发送数据到Judge集群
    for node := range cfg.Judge.Cluster {
        queue := JudgeQueues[node]
        go forward2JudgeTask(queue, node, judgeConcurrent) //后台线程周期性发送任务
    }
    //发送数据到Graph集群
    for node, nitem := range cfg.Graph.ClusterList {
        for _, addr := range nitem.Addrs {
            queue := GraphQueues[node+addr]
            go forward2GraphTask(queue, node, addr, graphConcurrent) //后台线程周期性发送任务
        }
    }
    //发送数据到Tsdb
    if cfg.Tsdb.Enabled {
        go forward2TsdbTask(tsdbConcurrent) //后台线程周期性发送任务
    }
}

//Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge
func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) {
    batch := g.Config().Judge.Batch // 一次发送,最多batch条数据
    addr := g.Config().Judge.Cluster[node]
    sema := nsema.NewSemaphore(concurrent)

    for {
        items := Q.PopBackBy(batch)  //队列后面批量Pop数据
        count := len(items)
        if count == 0 {
            time.Sleep(DefaultSendTaskSleepInterval) //周期间隔
            continue
        }

        judgeItems := make([]*cmodel.JudgeItem, count)
        for i := 0; i < count; i++ {
            judgeItems[i] = items[i].(*cmodel.JudgeItem)
        }

        //  同步Call + 有限并发 进行发送
        sema.Acquire()
        go func(addr string, judgeItems []*cmodel.JudgeItem, count int) {
            defer sema.Release()

            resp := &cmodel.SimpleRpcResponse{}
            var err error
            sendOk := false
            for i := 0; i < 3; i++ { //最多重试3次
                err = JudgeConnPools.Call(addr, "Judge.Send", judgeItems, resp) //RPC发送数据至Judge
                if err == nil {
                    sendOk = true
                    break
                }
                time.Sleep(time.Millisecond * 10) //失败重试间隔
            }

            // 发送成功或失败计数
            if !sendOk {
                log.Printf("send judge %s:%s fail: %v", node, addr, err)
                proc.SendToJudgeFailCnt.IncrBy(int64(count))
            } else {
                proc.SendToJudgeCnt.IncrBy(int64(count))
            }
        }(addr, judgeItems, count)
    }
}


type JudgeItem struct {
    Endpoint  string            `json:"endpoint"`
    Metric    string            `json:"metric"`
    Value     float64           `json:"value"`
    Timestamp int64             `json:"timestamp"`
    JudgeType string            `json:"judgeType"`
    Tags      map[string]string `json:"tags"`
}



// Graph定时任务, 将 Graph发送缓存中的数据 通过rpc连接池 发送到Graph
func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) {
    batch := g.Config().Graph.Batch // 一次发送,最多batch条数据
    sema := nsema.NewSemaphore(concurrent)

    for {
        items := Q.PopBackBy(batch)
        count := len(items)
        if count == 0 {
            time.Sleep(DefaultSendTaskSleepInterval)
            continue
        }

        graphItems := make([]*cmodel.GraphItem, count)
        for i := 0; i < count; i++ {
            graphItems[i] = items[i].(*cmodel.GraphItem)
        }

        sema.Acquire()
        go func(addr string, graphItems []*cmodel.GraphItem, count int) {
            defer sema.Release()

            resp := &cmodel.SimpleRpcResponse{}
            var err error
            sendOk := false
            for i := 0; i < 3; i++ { //最多重试3次
                err = GraphConnPools.Call(addr, "Graph.Send", graphItems, resp)  //RPC发送数据至Graph
                if err == nil {
                    sendOk = true
                    break
                }
                time.Sleep(time.Millisecond * 10) //失败重试间隔
            }

            // 发送成功或失败计数
            if !sendOk {
                log.Printf("send to graph %s:%s fail: %v", node, addr, err)
                proc.SendToGraphFailCnt.IncrBy(int64(count))
            } else {
                proc.SendToGraphCnt.IncrBy(int64(count))
            }
        }(addr, graphItems, count)
    }
}

// DsType 即RRD中的Datasource的类型:GAUGE|COUNTER|DERIVE
type GraphItem struct {
    Endpoint  string            `json:"endpoint"`
    Metric    string            `json:"metric"`
    Tags      map[string]string `json:"tags"`
    Value     float64           `json:"value"`
    Timestamp int64             `json:"timestamp"`
    DsType    string            `json:"dstype"`
    Step      int               `json:"step"`
    Heartbeat int               `json:"heartbeat"`
    Min       string            `json:"min"`
    Max       string            `json:"max"`
}


// Tsdb定时任务, 将数据通过api发送到tsdb
func forward2TsdbTask(concurrent int) {
    batch := g.Config().Tsdb.Batch // 一次发送,最多batch条数据
    retry := g.Config().Tsdb.MaxRetry  //失败尝试最大次数
    sema := nsema.NewSemaphore(concurrent)  //并发同步机制(信号量)

    for {
        items := TsdbQueue.PopBackBy(batch)
        if len(items) == 0 {
            time.Sleep(DefaultSendTaskSleepInterval)
            continue
        }
        //  同步Call + 有限并发 进行发送
        sema.Acquire()
        go func(itemList []interface{}) {
            defer sema.Release()

            var tsdbBuffer bytes.Buffer
            // 格式数据
            for i := 0; i < len(itemList); i++ {
                tsdbItem := itemList[i].(*cmodel.TsdbItem)
                tsdbBuffer.WriteString(tsdbItem.TsdbString()) //格式化发送数据
                tsdbBuffer.WriteString("\n") //分隔符
            }

            var err error
            for i := 0; i < retry; i++ {
                err = TsdbConnPoolHelper.Send(tsdbBuffer.Bytes()) // 发布数据至TSDB
                
                // 发送成功计数
                if err == nil {
                    proc.SendToTsdbCnt.IncrBy(int64(len(itemList)))
                    break
                }
                time.Sleep(100 * time.Millisecond)
            }
            
            // 发送失败计数与告警
            if err != nil {
                proc.SendToTsdbFailCnt.IncrBy(int64(len(itemList)))
                log.Println(err)
                return
            }
        }(items)
    }
}

  • startSenderCron() 发送相关数据统计
## 后台线程,发送统计数据
func startSenderCron() {
    go startProcCron()            //发送统计
    go startLogCron()             //连接池统计打印输出
}
func startProcCron() {
    for {
        time.Sleep(DefaultProcCronPeriod)
        refreshSendingCacheSize()  //发送队列统计
    }
}
func startLogCron() {
    for {
        time.Sleep(DefaultLogCronPeriod)
        logConnPoolsProc()       //日志打印输出Graph连接池统计
    }
}
func refreshSendingCacheSize() {
    proc.JudgeQueuesCnt.SetCnt(calcSendCacheSize(JudgeQueues))
    proc.GraphQueuesCnt.SetCnt(calcSendCacheSize(GraphQueues))
}
func calcSendCacheSize(mapList map[string]*list.SafeListLimited) int64 {
    var cnt int64 = 0
    for _, list := range mapList {
        if list != nil {
            cnt += int64(list.Len())
        }
    }
    return cnt
}
func logConnPoolsProc() {
    log.Printf("connPools proc: \n%v", strings.Join(GraphConnPools.Proc(), "\n"))
}


type TsdbItem struct {
    Metric    string            `json:"metric"`
    Tags      map[string]string `json:"tags"`
    Value     float64           `json:"value"`
    Timestamp int64             `json:"timestamp"`
}


receiver.Start() 接收数据,入列队

// RPC、TCP服务启动,接收数据
func Start() {
    go rpc.StartRpc()
    go socket.StartSocket()
}

RPC 服务启动与监听处理

func StartRpc() {
    if !g.Config().Rpc.Enabled {
        return
    }

    addr := g.Config().Rpc.Listen   
    tcpAddr, err := net.ResolveTCPAddr("tcp", addr) //默认端口8433
    if err != nil {
        log.Fatalf("net.ResolveTCPAddr fail: %s", err)
    }

    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        log.Fatalf("listen %s fail: %s", addr, err)
    } else {
        log.Println("rpc listening", addr)
    }

    server := rpc.NewServer()   //RPC server 
    server.Register(new(Transfer))  // 注册RPC类型

    for {
        conn, err := listener.Accept()    
        if err != nil {
            log.Println("listener.Accept occur error:", err)
            continue
        }
        go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
}

// RPC "Transfer.Ping"
func (this *Transfer) Ping(req cmodel.NullRpcRequest, resp *cmodel.SimpleRpcResponse) error {
    return nil                    
}

// RPC "Transfer.Update"
func (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResponse) error {
    return RecvMetricValues(args, reply, "rpc") //采集数据接收处理
}


// process new metric values
func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error {
    start := time.Now()   //start计时,用于计算处理总处理时间
    reply.Invalid = 0

    items := []*cmodel.MetaData{}
    for _, v := range args {
        if v == nil {
            reply.Invalid += 1
            continue
        }

        // 历史遗留问题.
        // 老版本agent上报的metric=kernel.hostname的数据,其取值为string类型,现在已经不支持了;所以,这里硬编码过滤掉
        if v.Metric == "kernel.hostname" {
            reply.Invalid += 1
            continue
        }

        if v.Metric == "" || v.Endpoint == "" {
            reply.Invalid += 1
            continue
        }
          
        //采集数据类型(RRDtool:COUNTER计数/GAUGE计量/DERIVE差量)
        if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE {
            reply.Invalid += 1
            continue
        }

        if v.Value == "" {
            reply.Invalid += 1
            continue
        }

        if v.Step <= 0 {
            reply.Invalid += 1
            continue
        }

        if len(v.Metric)+len(v.Tags) > 510 {
            reply.Invalid += 1
            continue
        }

        // TODO 呵呵,这里需要再优雅一点 
        now := start.Unix()
        if v.Timestamp <= 0 || v.Timestamp > now*2 {
            v.Timestamp = now
        }

        fv := &cmodel.MetaData{
            Metric:      v.Metric,
            Endpoint:    v.Endpoint,
            Timestamp:   v.Timestamp,
            Step:        v.Step,
            CounterType: v.Type,
            Tags:        cutils.DictedTagstring(v.Tags), //TODO tags键值对的个数,要做一下限制
        }

        valid := true
        var vv float64
        var err error
         
        //Value值反射类型判断,转为float64
        switch cv := v.Value.(type) {
        case string:
            vv, err = strconv.ParseFloat(cv, 64)
            if err != nil {
                valid = false
            }
        case float64:
            vv = cv
        case int64:
            vv = float64(cv)
        default:
            valid = false
        }

        if !valid {
            reply.Invalid += 1
            continue
        }

        fv.Value = vv
        items = append(items, fv)
    }

    // 发送计数(统计)
    cnt := int64(len(items))
    proc.RecvCnt.IncrBy(cnt)
    if from == "rpc" {
        proc.RpcRecvCnt.IncrBy(cnt)  
    } else if from == "http" {
        proc.HttpRecvCnt.IncrBy(cnt)
    }

    cfg := g.Config()

    if cfg.Graph.Enabled {
        sender.Push2GraphSendQueue(items)  //入Graphe队列
    }

    if cfg.Judge.Enabled {
        sender.Push2JudgeSendQueue(items)  //入Judge队列
    }

    if cfg.Tsdb.Enabled {
        sender.Push2TsdbSendQueue(items)   //入Tsdb队列
    }

    reply.Message = "ok"
    reply.Total = len(args)
    reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000

    return nil
}


// 将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定
func Push2JudgeSendQueue(items []*cmodel.MetaData) {
    for _, item := range items {
        pk := item.PK()
        node, err := JudgeNodeRing.GetNode(pk) //一致性哈希分配Node
        if err != nil {
            log.Println("E:", err)
            continue
        }

        // align ts
        step := int(item.Step)
        if step < MinStep {
            step = MinStep
        }
        ts := alignTs(item.Timestamp, int64(step))
        
        //judgeItem格式化
        judgeItem := &cmodel.JudgeItem{
            Endpoint:  item.Endpoint,
            Metric:    item.Metric,
            Value:     item.Value,
            Timestamp: ts,
            JudgeType: item.CounterType,
            Tags:      item.Tags,
        }
        Q := JudgeQueues[node]
        isSuccess := Q.PushFront(judgeItem)  //入队最前

        // 统计错误
        if !isSuccess {
            proc.SendToJudgeDropCnt.Incr()
        }
    }
}

// 将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定
func Push2GraphSendQueue(items []*cmodel.MetaData) {
    cfg := g.Config().Graph

    for _, item := range items {
        graphItem, err := convert2GraphItem(item) //格式化为GraphItem
        if err != nil {
            log.Println("E:", err)
            continue
        }
        pk := item.PK()

        // statistics. 为了效率,放到了这里,因此只有graph是enbale时才能trace
        proc.RecvDataTrace.Trace(pk, item)
        proc.RecvDataFilter.Filter(pk, item.Value, item)

        node, err := GraphNodeRing.GetNode(pk) //一致性哈希分配Node
        if err != nil {
            log.Println("E:", err)
            continue
        }

        cnode := cfg.ClusterList[node]
        errCnt := 0
        for _, addr := range cnode.Addrs {
            Q := GraphQueues[node+addr]
            if !Q.PushFront(graphItem) {      //入队最前
                errCnt += 1
            }
        }

        // statistics
        if errCnt > 0 {
            proc.SendToGraphDropCnt.Incr()
        }
    }
}

// 打到Graph的数据,要根据rrdtool的特定 来限制 step、counterType、timestamp
func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) {
    item := &cmodel.GraphItem{}

    item.Endpoint = d.Endpoint
    item.Metric = d.Metric
    item.Tags = d.Tags
    item.Timestamp = d.Timestamp
    item.Value = d.Value
    item.Step = int(d.Step)
    if item.Step < MinStep {
        item.Step = MinStep
    }
    item.Heartbeat = item.Step * 2

    if d.CounterType == g.GAUGE {
        item.DsType = d.CounterType
        item.Min = "U"
        item.Max = "U"
    } else if d.CounterType == g.COUNTER {
        item.DsType = g.DERIVE
        item.Min = "0"
        item.Max = "U"
    } else if d.CounterType == g.DERIVE {
        item.DsType = g.DERIVE
        item.Min = "0"
        item.Max = "U"
    } else {
        return item, fmt.Errorf("not_supported_counter_type")
    }

    item.Timestamp = alignTs(item.Timestamp, int64(item.Step)) //item.Timestamp - item.Timestamp%int64(item.Step)

    return item, nil
}

// 将原始数据入到tsdb发送缓存队列
func Push2TsdbSendQueue(items []*cmodel.MetaData) {
    for _, item := range items {
        tsdbItem := convert2TsdbItem(item)
        isSuccess := TsdbQueue.PushFront(tsdbItem)

        if !isSuccess {
            proc.SendToTsdbDropCnt.Incr()
        }
    }
}

// 转化为tsdb格式
func convert2TsdbItem(d *cmodel.MetaData) *cmodel.TsdbItem {
    t := cmodel.TsdbItem{Tags: make(map[string]string)}

    for k, v := range d.Tags {
        t.Tags[k] = v
    }
    t.Tags["endpoint"] = d.Endpoint
    t.Metric = d.Metric
    t.Timestamp = d.Timestamp
    t.Value = d.Value
    return &t
}

func alignTs(ts int64, period int64) int64 {
    return ts - ts%period
}

TCP socket服务启动与监听数据转发

func StartSocket() {
    if !g.Config().Socket.Enabled {
        return
    }

    addr := g.Config().Socket.Listen  //默认4444
    tcpAddr, err := net.ResolveTCPAddr("tcp", addr) 
    if err != nil {
        log.Fatalf("net.ResolveTCPAddr fail: %s", err)
    }

    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        log.Fatalf("listen %s fail: %s", addr, err)
    } else {
        log.Println("socket listening", addr)
    }

    defer listener.Close()
     
    for {  
        conn, err := listener.Accept()    
        if err != nil {
            log.Println("listener.Accept occur error:", err)
            continue
        }

        go socketTelnetHandle(conn)   //数据处理格式化与向后端转发
    }
}

## 数据处理格式化与向后端转发
func socketTelnetHandle(conn net.Conn) {
    defer conn.Close()

    items := []*cmodel.MetaData{}  //采集数据结构格式
    buf := bufio.NewReader(conn)

    cfg := g.Config()
    timeout := time.Duration(cfg.Socket.Timeout) * time.Second

    for {
        conn.SetReadDeadline(time.Now().Add(timeout))
        line, err := buf.ReadString('\n')   //行读取
        if err != nil {
            break
        }

        line = strings.Trim(line, "\n")    

        if line == "quit" {           //退出TCP连接命令“quit”
            break   
        }

        if line == "" {
            continue
        }

        t := strings.Fields(line)    //以空格分隔Slice
        if len(t) < 2 {
            continue
        }

        cmd := t[0]

        if cmd != "update" {
            continue
        }

        item, err := convertLine2MetaData(t[1:]) //格式化数据
        if err != nil {
            continue
        }

        items = append(items, item)   //汇集数据
    }
    
    // 统计
    proc.SocketRecvCnt.IncrBy(int64(len(items)))
    proc.RecvCnt.IncrBy(int64(len(items)))

    if cfg.Graph.Enabled {
        sender.Push2GraphSendQueue(items)  //发送graph队列
    }

    if cfg.Judge.Enabled {
        sender.Push2JudgeSendQueue(items)  //发送Judge队列
    }

    return

}

type MetaData struct {
    Metric      string            `json:"metric"`
    Endpoint    string            `json:"endpoint"`
    Timestamp   int64             `json:"timestamp"`
    Step        int64             `json:"step"`
    Value       float64           `json:"value"`
    CounterType string            `json:"counterType"`
    Tags        map[string]string `json:"tags"`
}

扩展知识

相关文章

网友评论

      本文标题:OpenFalcon源码分析(Transfer组件)

      本文链接:https://www.haomeiwen.com/subject/lobhnftx.html