美文网首页
OpenFalcon源码分析(Judge组件)

OpenFalcon源码分析(Judge组件)

作者: Xiao_Yang | 来源:发表于2018-09-21 09:34 被阅读0次

    Judge版本

    VERSION = "2.0.2"

    Judge组件功能

    Judge是用于判断是否触发报警条件的组件。
    Transfer的数据不但要转送到Graph来存储并绘图,还要转送到Judge用于报警判断。Judge先从hbs获取所有策略列表,静等Transfer的数据转发。
    每收到一条Transfer转发过来的数据,立即找到这条数据关联的Strategy、Expression,然后做阈值判断。【官方描述】

    Judge组件逻辑图

    Judge逻辑图

    Portal关于报警策略与表达式定义操作说明书

    操作文档

    main主入口分析

    func main() {
        cfg := flag.String("c", "cfg.json", "configuration file")
        version := flag.Bool("v", false, "show version")
        flag.Parse()
    
        if *version {
            fmt.Println(g.VERSION)
            os.Exit(0)
        }
    
        g.ParseConfig(*cfg)     //全局配置文件解析
    
        g.InitRedisConnPool()  //初始化Redis连接池 【参考详细分析】
        g.InitHbsClient()      //初始化HBS客户端连接【参考详细分析】
     
        store.InitHistoryBigMap()  //BigMap缓存指定采集数据 【参考详细分析】
    
        go http.Start()       //Http API服务启动 【参考详细分析】
        go rpc.Start()        //RPC服务启动      【参考详细分析】
    
        go cron.SyncStrategies()  //周期任务,同步HBS策略和表达式    【参考详细分析】
        go cron.CleanStale()      //周期任务,清理过时策略 【参考详细分析】
    
        select {}
    }
    
    

    g.ParseConfig(*cfg) 初始化全局配置文件

    type GlobalConfig struct {
        Debug     bool         `json:"debug"`
        DebugHost string       `json:"debugHost"`
        Remain    int          `json:"remain"`
        Http      *HttpConfig  `json:"http"`
        Rpc       *RpcConfig   `json:"rpc"`
        Hbs       *HbsConfig   `json:"hbs"`
        Alarm     *AlarmConfig `json:"alarm"`
    }
    
    func Config() *GlobalConfig {
        configLock.RLock()
        defer configLock.RUnlock()
        return config
    }
    
    # 解析全局配置
    func ParseConfig(cfg string) {
        if cfg == "" {   //参数配置
            log.Fatalln("use -c to specify configuration file")
        }
    
        if !file.IsExist(cfg) {  //是否存在
            log.Fatalln("config file:", cfg, "is not existent")
        }
    
        ConfigFile = cfg
    
        configContent, err := file.ToTrimString(cfg) //字符串
        if err != nil {
            log.Fatalln("read config file:", cfg, "fail:", err)
        }
    
        var c GlobalConfig
        err = json.Unmarshal([]byte(configContent), &c) //反序列为结构
        if err != nil {
            log.Fatalln("parse config file:", cfg, "fail:", err)
        }
    
        configLock.Lock()
        defer configLock.Unlock()
    
        config = &c
    
        log.Println("read config file:", cfg, "successfully")
    }
    
    
    
    

    g.InitRedisConnPool() 初始化Redis连接池

    func InitRedisConnPool() {
        if !Config().Alarm.Enabled {
            return
        }
    
        dsn := Config().Alarm.Redis.Dsn
        maxIdle := Config().Alarm.Redis.MaxIdle
        idleTimeout := 240 * time.Second
    
        connTimeout := time.Duration(Config().Alarm.Redis.ConnTimeout) * time.Millisecond
        readTimeout := time.Duration(Config().Alarm.Redis.ReadTimeout) * time.Millisecond
        writeTimeout := time.Duration(Config().Alarm.Redis.WriteTimeout) * time.Millisecond
    
        RedisConnPool = &redis.Pool{
            MaxIdle:     maxIdle,
            IdleTimeout: idleTimeout,
            Dial: func() (redis.Conn, error) {
                c, err := redis.DialTimeout("tcp", dsn, connTimeout, readTimeout, writeTimeout)
                if err != nil {
                    return nil, err
                }
                return c, err
            },
            TestOnBorrow: PingRedis,
        }
    }
    
    func PingRedis(c redis.Conn, t time.Time) error {
        _, err := c.Do("ping")
        if err != nil {
            log.Println("[ERROR] ping redis fail", err)
        }
        return err
    }
    

    g.InitHbsClient() 实例化HBS客户端对象

    func InitHbsClient() {
        HbsClient = &SingleConnRpcClient{
            RpcServers: Config().Hbs.Servers,
            Timeout:    time.Duration(Config().Hbs.Timeout) * time.Millisecond,
        }
    }
    

    store.InitHistoryBigMap() 初始化内存BigMap,存在采集历史数据

    # 创建BigMap([256]JudgeItemMap),存放采集的监控历史数据
    # [00..f] JudgeItemMap
    #  .
    #  .
    # [f0..f] JudgeItemMap
    func InitHistoryBigMap() {
        arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
        for i := 0; i < 16; i++ {
            for j := 0; j < 16; j++ {
                HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap() 
            }
        }
    }
    
    
    ## 创建与初始化JudgeItemMap -> map[string]*SafeLinkedList
    func NewJudgeItemMap() *JudgeItemMap {
        return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
    }
    
    //这是个线程不安全的大Map,需要提前初始化好
    var HistoryBigMap = make(map[string]*JudgeItemMap)
    
    //JudgeItemMap结构体
    type JudgeItemMap struct {
        sync.RWMutex
        M map[string]*SafeLinkedList
    }
    
    //SafeLinkedList结构体,"container/list"
    type SafeLinkedList struct {
        sync.RWMutex
        L *list.List
    }
    
    

    http.Start() HTTP API服务监听与处理

    func init() {
        configCommonRoutes()  //组件公共API路由,可参考HBS模块
        configInfoRoutes()    //信息查询API路由 
    }
    
    func Start() {
        if !g.Config().Http.Enabled {   //开启HTTP
            return
        }
    
        addr := g.Config().Http.Listen  //全局配置监听端品
        if addr == "" {
            return
        }
        s := &http.Server{    
            Addr:           addr,
            MaxHeaderBytes: 1 << 30,
        }
        log.Println("http listening", addr)
        log.Fatalln(s.ListenAndServe())
    }
    
    func configInfoRoutes() {
        // e.g. /strategy/lg-dinp-docker01.bj/cpu.idle
        http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) {})
    
        // e.g. /expression/net.port.listen/port=22
        http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) {})
    
        //统计bigmap数据总长度
        http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {})
        //查看BigMap内指定的历史数据
        http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) {})
    
    }
    
    
    

    rpc.Start() RPC服务注册与处理

    
    func Start() {
        if !g.Config().Rpc.Enabled {
            return
        }
        addr := g.Config().Rpc.Listen
        tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
        if err != nil {
            log.Fatalf("net.ResolveTCPAddr fail: %s", err)
        }
    
        listener, err := net.ListenTCP("tcp", tcpAddr)
        if err != nil {
            log.Fatalf("listen %s fail: %s", addr, err)
        } else {
            log.Println("rpc listening", addr)
        }
    
        rpc.Register(new(Judge))   //注册Judge
    
        for {
            conn, err := listener.Accept()
            if err != nil {
                log.Printf("listener.Accept occur error: %s", err)
                continue
            }
            go rpc.ServeConn(conn)
        }
    }
    
    ## RPC Judge.Ping方法
    type Judge int
    func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error {
        return nil
    }
    ## RPC Judge.Send方法,Transfer使用此RPC方式上传数据
    func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
        remain := g.Config().Remain   //最大保留多少次历史记录,由全局配置文件定义
        // 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销
        now := time.Now().Unix()
        for _, item := range items {
            exists := g.FilterMap.Exists(item.Metric)  //判断缓存filtermap是否存在匹配Metric相关策略,无相关策略将不缓存此数据
            if !exists {
                continue
            }
            pk := item.PrimaryKey() //生成HASH key
            store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)  //缓存历史数据
        }
        return nil
    }
    
    //JudgeItem数据结构
    type JudgeItem struct {
            Endpoint    string            `json:"endpoint"`
            Metric        string            `json:"metric"`
            Value        float64            `json:"value"`
            Timestamp    int64            `json:"timestamp"`
            JudgeType    string            `json:"judgeType"`
            Tags        map[string]string    `json:"tags"`
        }
    
    
    • 历史数据的缓存逻辑分析
    #生成MD5 Hash值
    func (this *JudgeItem) PrimaryKey() string {
        return utils.Md5(utils.PK(this.Endpoint, this.Metric, this.Tags))
    }
    
    #基于HASH前两位做为索引KEY,存入HistoryBigMap且Judge计算处理
    store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
    
    #JudgeItem Map缓存和Judge计算
    func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
        //JudgeItemMap.Get(HASH)找是否存在HASH KEY项列表,如果存在Push
        //数据和校验数据有效性后Judge计算;如果不存在则基于HASH key创建
        //JudgeItemMap并Push数据。
        if linkedList, exists := this.Get(key); exists {
            needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
            if needJudge {
                Judge(linkedList, val, now) //【参考Judge逻辑分析】
            }
        } else {
            NL := list.New()
            NL.PushFront(val)   //push into list
            safeList := &SafeLinkedList{L: NL}  //create safelist
            this.Set(key, safeList)    //save into JudgeItemMap[hash]
            Judge(safeList, val, now)  //【参考Judge逻辑分析】
        }
    }
    
    # @return needJudge 如果是false不需要做judge,因为新上来的数据不合法
    func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
        this.Lock()
        defer this.Unlock()
    
        sz := this.L.Len()
        if sz > 0 {
            // 新push上来的数据有可能重复了(等于以前ts),或者timestamp不对(小于以前ts),这种数据要丢掉
            if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 {
                return false
            }
        }
    
        this.L.PushFront(v)  //最新数据放置在列表首
    
        sz++
        if sz <= maxCount {
            return true
        }
       
        //达到最大保存历史数据项条数后则清除尾部
        del := sz - maxCount
        for i := 0; i < del; i++ {
            this.L.Remove(this.L.Back())   //删除列表尾记录
        }
    
        return true
    }
    
    • Judge报警判断逻辑分析
    # Judge入口函数
    func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
        CheckStrategy(L, firstItem, now)   //Strategy处理
        CheckExpression(L, firstItem, now) //Expression处理
    }
    
    ## 策略检测及发送事件
    func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
        key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
        strategyMap := g.StrategyMap.Get()
        strategies, exists := strategyMap[key]
        if !exists {
            return
        }
    
        for _, s := range strategies {
            // 因为key仅仅是endpoint和metric,所以得到的strategies并不一定是与当前judgeItem相关的
            // 比如lg-dinp-docker01.bj配置了两个proc.num的策略,一个name=docker,一个name=agent
            // 所以此处要排除掉一部分
            related := true
            for tagKey, tagVal := range s.Tags {
                if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal {
                    related = false
                    break
                }
            }
    
            if !related {
                continue
            }
    
            judgeItemWithStrategy(L, s, firstItem, now)
        }
    }
    
    ### 判断采集数据,如果匹配策略计算条件则发送报警事件
    func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
        fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue)  //解析报警函数
        if err != nil {
            log.Printf("[ERROR] parse func %s fail: %v. strategy id: %d", strategy.Func, err, strategy.Id)
            return
        }
    
        historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //执行判断与计算
        if !isEnough {
            return
        }
      
       // 格式化事件信息
        event := &model.Event{
            Id:         fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
            Strategy:   &strategy,
            Endpoint:   firstItem.Endpoint,
            LeftValue:  leftValue,
            EventTime:  firstItem.Timestamp,
            PushedTags: firstItem.Tags,
        }
    
        sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)   //依据执行判断结果决定发送报警事件
    }
    
    ## 表达式检测及发送事件
    func CheckExpression(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
        keys := buildKeysFromMetricAndTags(firstItem)
        if len(keys) == 0 {
            return
        }
    
        // expression可能会被多次重复处理,用此数据结构保证只被处理一次
        handledExpression := make(map[int]struct{})
    
        expressionMap := g.ExpressionMap.Get()
        for _, key := range keys {
            expressions, exists := expressionMap[key]  //查询是否存在采集数据相对应的表达式
            if !exists {
                continue
            }
    
            related := filterRelatedExpressions(expressions, firstItem)   //过滤与采集数据相关的Expression 
            for _, exp := range related {
                if _, ok := handledExpression[exp.Id]; ok {
                    continue
                }
                handledExpression[exp.Id] = struct{}{}
                judgeItemWithExpression(L, exp, firstItem, now)
            }
        }
    }
    
    ### 过滤与采集数据相关的Expression
    func filterRelatedExpressions(expressions []*model.Expression, firstItem *model.JudgeItem) []*model.Expression {
        size := len(expressions)
        if size == 0 {
            return []*model.Expression{}
        }
    
        exps := make([]*model.Expression, 0, size)
    
        for _, exp := range expressions {
    
            related := true
            
            itemTagsCopy := firstItem.Tags
            // 注意:exp.Tags 中可能会有一个endpoint=xxx的tag
            if _, ok := exp.Tags["endpoint"]; ok {
                itemTagsCopy = copyItemTags(firstItem)
            }
    
            for tagKey, tagVal := range exp.Tags {
                if myVal, exists := itemTagsCopy[tagKey]; !exists || myVal != tagVal {
                    related = false
                    break
                }
            }
    
            if !related {
                continue
            }
            exps = append(exps, exp)    //[]exps
        }
        return exps
    }
    
    ### 判断采集数据,如果匹配表达式计算条件则发送报警事件
    func judgeItemWithExpression(L *SafeLinkedList, expression *model.Expression, firstItem *model.JudgeItem, now int64) {
        fn, err := ParseFuncFromString(expression.Func, expression.Operator, expression.RightValue)  //解析报警函数
        if err != nil {
            log.Printf("[ERROR] parse func %s fail: %v. expression id: %d", expression.Func, err, expression.Id)
            return
        }
    
        historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //执行判断与计算
        if !isEnough {
            return
        }
    
        // 格式化事件信息
        event := &model.Event{
            Id:         fmt.Sprintf("e_%d_%s", expression.Id, firstItem.PrimaryKey()),
            Expression: expression,
            Endpoint:   firstItem.Endpoint,
            LeftValue:  leftValue,
            EventTime:  firstItem.Timestamp,
            PushedTags: firstItem.Tags,
        }
    
        sendEventIfNeed(historyData, isTriggered, now, event, expression.MaxStep) //依据执行判断结果决定发送报警事件
    
    }
    
    ## 解析操作字符串,转化为报警函数
    func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) {
        if str == "" {
            return nil, fmt.Errorf("func can not be null!")
        }
        idx := strings.Index(str, "#") //以#为定位符
        args, err := atois(str[idx+1 : len(str)-1])  //字位符后为参数
        if err != nil {
            return nil, err
        }
    
        switch str[:idx-1] {   //定位符前为函数名
        case "max":
            fn = &MaxFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "min":
            fn = &MinFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "all":
            fn = &AllFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "sum":
            fn = &SumFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "avg":
            fn = &AvgFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "diff":
            fn = &DiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "pdiff":
            fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
        case "lookup":
            fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue}
        default:
            err = fmt.Errorf("not_supported_method")
        }
        return
    }
    
    ## 判断与发送事件
    func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
        lastEvent, exists := g.LastEvents.Get(event.Id)
        if isTriggered {
            event.Status = "PROBLEM"
            if !exists || lastEvent.Status[0] == 'O' {
                // 本次触发了阈值,之前又没报过警,得产生一个报警Event
                event.CurrentStep = 1
    
                // 但是有些用户把最大报警次数配置成了0,相当于屏蔽了,要检查一下
                if maxStep == 0 {
                    return
                }
    
                sendEvent(event)  //发送事件
                return
            }
    
            // 逻辑走到这里,说明之前Event是PROBLEM状态
            if lastEvent.CurrentStep >= maxStep {
                // 报警次数已经足够多,到达了最多报警次数了,不再报警
                return
            }
    
            if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime {
                // 产生过报警的点,就不能再使用来判断了,否则容易出现一分钟报一次的情况
                // 只需要拿最后一个historyData来做判断即可,因为它的时间最老
                return
            }
    
            if now-lastEvent.EventTime < g.Config().Alarm.MinInterval {
                // 报警不能太频繁,两次报警之间至少要间隔MinInterval秒,否则就不能报警
                return
            }
    
            event.CurrentStep = lastEvent.CurrentStep + 1
            sendEvent(event)
        } else {
            // 如果LastEvent是Problem,报OK,否则啥都不做
            if exists && lastEvent.Status[0] == 'P' {
                event.Status = "OK"          //状态转OK
                event.CurrentStep = 1
                sendEvent(event)  //发送事件
            }
        }
    }
    
    ### sendEvent将事件保存至Redis(预警的异步机制)
    func sendEvent(event *model.Event) {
        // update last event
        g.LastEvents.Set(event.Id, event)  //事件缓存
    
        bs, err := json.Marshal(event)     //Json序列化事件
        if err != nil {
            log.Printf("json marshal event %v fail: %v", event, err)
            return
        }
    
        // send to redis
        redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority())                        //redis键名
        rc := g.RedisConnPool.Get()
        defer rc.Close()
        rc.Do("LPUSH", redisKey, string(bs))  //LPUSH存储
    }
    
    
    • 报警函数分析
    # Max
    # 例如: max(#3)
    #    对于最新的3个点,其最大值满足阈值条件则报警
    type MaxFunction struct {
        Function
        Limit      int     //点数
        Operator   string  //操作符
        RightValue float64 //阀值 
    }
    func (this MaxFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit) //取指定的历史数据
        if !isEnough {
            return
        }
     
        max := vs[0].Value
        //取最大值 
        for i := 1; i < this.Limit; i++ {
            if max < vs[i].Value {
                max = vs[i].Value
            }
        }
    
        leftValue = max
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
        return
    }
    
    # Min
    # 如: min(#3)
    #     对于最新的3个点,其最小值满足阈值条件则报警
    type MinFunction struct {
        Function
        Limit      int
        Operator   string
        RightValue float64
    }
    func (this MinFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit)
        if !isEnough {
            return
        }
    
        min := vs[0].Value
        //取最小值
        for i := 1; i < this.Limit; i++ {
            if min > vs[i].Value {
                min = vs[i].Value
            }
        }
    
        leftValue = min
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
        return
    }
    
    # All
    # 如:all(#3)
    #    最新的3个点都满足阈值条件则报警
    type AllFunction struct {
        Function
        Limit      int
        Operator   string
        RightValue float64
    }
    func (this AllFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit)
        if !isEnough {
            return
        }
    
        isTriggered = true
        // 遁环判断操作条件
        for i := 0; i < this.Limit; i++ {
            isTriggered = checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) //操作符判断返回true|false
            if !isTriggered {
                break
            }
        }
    
        leftValue = vs[0].Value
        return
    }
    
    # Lookup
    # 如 lookup(#2,3)
    #    最新的3个点中有2个满足条件则报警
    type LookupFunction struct {
        Function
        Num        int   //条件数2
        Limit      int   //点数3
        Operator   string
        RightValue float64
    }
    func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit)
        if !isEnough {
            return
        }
    
        leftValue = vs[0].Value
    
        for n, i := 0, 0; i < this.Limit; i++ {
            if checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) {
                n++           //满足条件则累计
                if n == this.Num {     //达到条件则触发
                    isTriggered = true
                    return
                }
            }
        }
    
        return
    }
    
    # Sum
    # 如 sum(#3)
    #    对于最新的3个点,其和满足阈值条件则报警
    type SumFunction struct {
        Function
        Limit      int  
        Operator   string
        RightValue float64
    }
    func (this SumFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit)
        if !isEnough {
            return
        }
    
        sum := 0.0
        for i := 0; i < this.Limit; i++ {
            sum += vs[i].Value     //累计和
        }
    
        leftValue = sum
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)    //操作符判断返回true|false
        return 
    }
    
    # Avg
    # 如 avg(#3)
    #    对于最新的3个点,其平均值满足阈值条件则报警
    type AvgFunction struct {
        Function
        Limit      int
        Operator   string
        RightValue float64
    }
    func (this AvgFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit)
        if !isEnough {
            return
        }
    
        sum := 0.0
        for i := 0; i < this.Limit; i++ {
            sum += vs[i].Value              //累计和
        }
    
        leftValue = sum / float64(this.Limit)  //求平均
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)     //操作符判断返回true|false
        return
    }
    
    # Diff
    # 如 diff(#3)
    #  拿最新push上来的点(被减数),与历史最新的3个点(3个减数)相减,得到3个差
    #  只要有一个差满足阈值条件则报警
    type DiffFunction struct {
        Function
        Limit      int
        Operator   string
        RightValue float64
    }
    // 只要有一个点的diff触发阈值,就报警
    func (this DiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        // 此处this.Limit要+1,因为通常说diff(#3),是当前点与历史的3个点相比较
        // 然而最新点已经在linkedlist的第一个位置,所以……
        vs, isEnough = L.HistoryData(this.Limit + 1)
        if !isEnough {
            return
        }
    
        if len(vs) == 0 {
            isEnough = false
            return
        }
    
        first := vs[0].Value  //最新值
    
        isTriggered = false
        for i := 1; i < this.Limit+1; i++ {
            // diff是当前值减去历史值
            leftValue = first - vs[i].Value
            isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)  
            if isTriggered {
                break   //只要任何一次满足判断条件则返回True触发
            }
        }
        return
    }
    
    # Pdiff 
    # 如:pdiff(#3)
    #    拿最新push上来的点,与历史最新的3个点相减,得到3个差
    #    再将3个差值分别除以减数,得到3个商值,只要有一个商值满足阈值则报警
    type PDiffFunction struct {
        Function
        Limit      int
        Operator   string
        RightValue float64
    }
    func (this PDiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
        vs, isEnough = L.HistoryData(this.Limit + 1)
        if !isEnough {
            return
        }
    
        if len(vs) == 0 {
            isEnough = false
            return
        }
    
        first := vs[0].Value
    
        isTriggered = false
        for i := 1; i < this.Limit+1; i++ {
            if vs[i].Value == 0 {
                continue
            }
            
            // 差/Value*100
            leftValue = (first - vs[i].Value) / vs[i].Value * 100.0
            isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
            if isTriggered {
                break
            }
        }
        return
    }
    
    # 操作符解析与判断
    func checkIsTriggered(leftValue float64, operator string, rightValue float64) (isTriggered bool) {
        switch operator {
        case "=", "==":
            isTriggered = math.Abs(leftValue-rightValue) < 0.0001
        case "!=":
            isTriggered = math.Abs(leftValue-rightValue) > 0.0001
        case "<":
            isTriggered = leftValue < rightValue
        case "<=":
            isTriggered = leftValue <= rightValue
        case ">":
            isTriggered = leftValue > rightValue
        case ">=":
            isTriggered = leftValue >= rightValue
        }
        return
    }
    

    cron.SyncStrategies() 同步HBS策略和表达式

    
    # 同步策略配置入口函数
    func SyncStrategies() {
        duration := time.Duration(g.Config().Hbs.Interval) * time.Second   //全局配置间隔
        for {
            syncStrategies()   //同步策略项配置函数调用
            syncExpression()   //同步表达式配置函数调用
            syncFilter()       //同步过滤器配置函数调用
            time.Sleep(duration)  //同步间隔
        }
    }
    
    
    ## RPC调用"Hbs.GetStrategies"HBS同步策略
    func syncStrategies() {
        var strategiesResponse model.StrategiesResponse
        err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) //RPC调用HBS
        if err != nil {
            log.Println("[ERROR] Hbs.GetStrategies:", err)
            return
        }
        rebuildStrategyMap(&strategiesResponse) //缓存
    }
    ### 归整策略数据和缓存
    func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) {
        //缓存MAP格式 Key    =>  Value
        //            ||         ||          
        //  endpoint/metric => [strategy1, strategy2 ...]
        m := make(map[string][]model.Strategy)
        for _, hs := range strategiesResponse.HostStrategies {
            hostname := hs.Hostname
            //debug打印
            if g.Config().Debug && hostname == g.Config().DebugHost {
                log.Println(hostname, "strategies:")
                bs, _ := json.Marshal(hs.Strategies)
                fmt.Println(string(bs))
            }
            //数据归整至Map
            for _, strategy := range hs.Strategies {
                key := fmt.Sprintf("%s/%s", hostname, strategy.Metric) 
                if _, exists := m[key]; exists {
                    m[key] = append(m[key], strategy)
                } else {
                    m[key] = []model.Strategy{strategy}
                }
            }
        }
    
        g.StrategyMap.ReInit(m)  //初始化全局变量
    }
    
    
    ## RPC调用"Hbs.GetExpressions"HBS同步表达式
    func syncExpression() {
        var expressionResponse model.ExpressionResponse
        err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse)  //RPC调用HBS
        if err != nil {
            log.Println("[ERROR] Hbs.GetExpressions:", err)
            return
        }
    
        rebuildExpressionMap(&expressionResponse)  //缓存
    }
    ### 归整表达式数据和缓存
    
    func rebuildExpressionMap(expressionResponse *model.ExpressionResponse) {
        m := make(map[string][]*model.Expression)
             //缓存MAP格式 Key    =>    Value
             //          ||            ||          
             //      metric/k=v  => [expression1, expression2 ...]
        for _, exp := range expressionResponse.Expressions {
            for k, v := range exp.Tags {
                key := fmt.Sprintf("%s/%s=%s", exp.Metric, k, v)
                if _, exists := m[key]; exists {
                    m[key] = append(m[key], exp)
                } else {
                    m[key] = []*model.Expression{exp}
                }
            }
        }
    
        g.ExpressionMap.ReInit(m)  //初始化全局变量
    }
    
    
    ## 构建同步过滤器map,以Metric为查询Key
    func syncFilter() {
        m := make(map[string]string)  //缓存map
    
        //M map[string][]model.Strategy
        strategyMap := g.StrategyMap.Get() //获取同步的strategyMap
        for _, strategies := range strategyMap {
            for _, strategy := range strategies {
                m[strategy.Metric] = strategy.Metric
            }
        }  //迭代Metric
    
        //M map[string][]*model.Expression 
        expressionMap := g.ExpressionMap.Get() //获取同步的expressionMap
        for _, expressions := range expressionMap {
            for _, expression := range expressions {
                m[expression.Metric] = expression.Metric
            }
        }  //迭代Metric
        g.FilterMap.ReInit(m)   //初始化全局变量
    }
    
    #### 全局(StrategyMap、ExpressionMap、FilterMap)变量和缓存初始化
    var (
        StrategyMap   = &SafeStrategyMap{M: make(map[string][]model.Strategy)}
        ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)}
        FilterMap     = &SafeFilterMap{M: make(map[string]string)}
    )
    
    func (this *SafeStrategyMap) ReInit(m map[string][]model.Strategy) {
        this.Lock()
        defer this.Unlock()
        this.M = m
    }
    
    func (this *SafeExpressionMap) ReInit(m map[string][]*model.Expression) {
        this.Lock()
        defer this.Unlock()
        this.M = m
    }
    
    func (this *SafeFilterMap) ReInit(m map[string]string) {
        this.Lock()
        defer this.Unlock()
        this.M = m
    }
    
    
    ####策略结构体定义
    type Strategy struct {
        Id         int               `json:"id"`
        Metric     string            `json:"metric"`
        Tags       map[string]string `json:"tags"`
        Func       string            `json:"func"`       // e.g. max(#3) all(#3)
        Operator   string            `json:"operator"`   // e.g. < !=
        RightValue float64           `json:"rightValue"` // critical value
        MaxStep    int               `json:"maxStep"`
        Priority   int               `json:"priority"`
        Note       string            `json:"note"`
        Tpl        *Template         `json:"tpl"`        //模版
    }
    
    ####表达式结构体定义
    type Expression struct {
        Id         int               `json:"id"`
        Metric     string            `json:"metric"`
        Tags       map[string]string `json:"tags"`
        Func       string            `json:"func"`       // e.g. max(#3) all(#3)
        Operator   string            `json:"operator"`   // e.g. < !=
        RightValue float64           `json:"rightValue"` // critical value
        MaxStep    int               `json:"maxStep"`
        Priority   int               `json:"priority"`
        Note       string            `json:"note"`
        ActionId   int               `json:"actionId"`   //执行动作ID
    }
    
    

    cron.CleanStale()

    # 定期清理任务运行入口
    func CleanStale() {
        for {
            time.Sleep(time.Hour * 5)
            cleanStale()  //调用清理
        }
    }
    
    ##清理7天之前的历史过期数据
    func cleanStale() {
        before := time.Now().Unix() - 3600*24*7
    
        arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
        for i := 0; i < 16; i++ {
            for j := 0; j < 16; j++ {
                store.HistoryBigMap[arr[i]+arr[j]].CleanStale(before) //清理BigMap数据
            }
        }
    }
    
    #清理实现
    func (this *JudgeItemMap) CleanStale(before int64) {
        keys := []string{}
    
        this.RLock()
        for key, L := range this.M {
            front := L.Front()
            if front == nil {
                continue
            }
            //迭代匹配时间戳,小于则过期
            if front.Value.(*model.JudgeItem).Timestamp < before {
                keys = append(keys, key)
            }
        }
        this.RUnlock()
       
        //批量清理
        this.BatchDelete(keys)
    }
    
    func (this *JudgeItemMap) BatchDelete(keys []string) {
        count := len(keys)
        if count == 0 {
            return
        }
    
        this.Lock()
        defer this.Unlock()
        for i := 0; i < count; i++ {
            delete(this.M, keys[i])  //map delete条目
        }
    }
    
    
    

    技术经验借鉴

    • BigMap内存构造缓存历史数据机制与应用
    • 针对报警计算funcation的设计模式之 "策略模式"应用
    • SafeLinkedList并发安全的链表LIST操作实现

    扩展学习

    相关文章

      网友评论

          本文标题:OpenFalcon源码分析(Judge组件)

          本文链接:https://www.haomeiwen.com/subject/bxpmnftx.html