美文网首页
OpenFalcon源码分析(Judge组件)

OpenFalcon源码分析(Judge组件)

作者: Xiao_Yang | 来源:发表于2018-09-21 09:34 被阅读0次

Judge版本

VERSION = "2.0.2"

Judge组件功能

Judge是用于判断是否触发报警条件的组件。
Transfer的数据不但要转送到Graph来存储并绘图,还要转送到Judge用于报警判断。Judge先从hbs获取所有策略列表,静等Transfer的数据转发。
每收到一条Transfer转发过来的数据,立即找到这条数据关联的Strategy、Expression,然后做阈值判断。【官方描述】

Judge组件逻辑图

Judge逻辑图

Portal关于报警策略与表达式定义操作说明书

操作文档

main主入口分析

func main() {
    cfg := flag.String("c", "cfg.json", "configuration file")
    version := flag.Bool("v", false, "show version")
    flag.Parse()

    if *version {
        fmt.Println(g.VERSION)
        os.Exit(0)
    }

    g.ParseConfig(*cfg)     //全局配置文件解析

    g.InitRedisConnPool()  //初始化Redis连接池 【参考详细分析】
    g.InitHbsClient()      //初始化HBS客户端连接【参考详细分析】
 
    store.InitHistoryBigMap()  //BigMap缓存指定采集数据 【参考详细分析】

    go http.Start()       //Http API服务启动 【参考详细分析】
    go rpc.Start()        //RPC服务启动      【参考详细分析】

    go cron.SyncStrategies()  //周期任务,同步HBS策略和表达式    【参考详细分析】
    go cron.CleanStale()      //周期任务,清理过时策略 【参考详细分析】

    select {}
}

g.ParseConfig(*cfg) 初始化全局配置文件

type GlobalConfig struct {
    Debug     bool         `json:"debug"`
    DebugHost string       `json:"debugHost"`
    Remain    int          `json:"remain"`
    Http      *HttpConfig  `json:"http"`
    Rpc       *RpcConfig   `json:"rpc"`
    Hbs       *HbsConfig   `json:"hbs"`
    Alarm     *AlarmConfig `json:"alarm"`
}

func Config() *GlobalConfig {
    configLock.RLock()
    defer configLock.RUnlock()
    return config
}

# 解析全局配置
func ParseConfig(cfg string) {
    if cfg == "" {   //参数配置
        log.Fatalln("use -c to specify configuration file")
    }

    if !file.IsExist(cfg) {  //是否存在
        log.Fatalln("config file:", cfg, "is not existent")
    }

    ConfigFile = cfg

    configContent, err := file.ToTrimString(cfg) //字符串
    if err != nil {
        log.Fatalln("read config file:", cfg, "fail:", err)
    }

    var c GlobalConfig
    err = json.Unmarshal([]byte(configContent), &c) //反序列为结构
    if err != nil {
        log.Fatalln("parse config file:", cfg, "fail:", err)
    }

    configLock.Lock()
    defer configLock.Unlock()

    config = &c

    log.Println("read config file:", cfg, "successfully")
}



g.InitRedisConnPool() 初始化Redis连接池

func InitRedisConnPool() {
    if !Config().Alarm.Enabled {
        return
    }

    dsn := Config().Alarm.Redis.Dsn
    maxIdle := Config().Alarm.Redis.MaxIdle
    idleTimeout := 240 * time.Second

    connTimeout := time.Duration(Config().Alarm.Redis.ConnTimeout) * time.Millisecond
    readTimeout := time.Duration(Config().Alarm.Redis.ReadTimeout) * time.Millisecond
    writeTimeout := time.Duration(Config().Alarm.Redis.WriteTimeout) * time.Millisecond

    RedisConnPool = &redis.Pool{
        MaxIdle:     maxIdle,
        IdleTimeout: idleTimeout,
        Dial: func() (redis.Conn, error) {
            c, err := redis.DialTimeout("tcp", dsn, connTimeout, readTimeout, writeTimeout)
            if err != nil {
                return nil, err
            }
            return c, err
        },
        TestOnBorrow: PingRedis,
    }
}

func PingRedis(c redis.Conn, t time.Time) error {
    _, err := c.Do("ping")
    if err != nil {
        log.Println("[ERROR] ping redis fail", err)
    }
    return err
}

g.InitHbsClient() 实例化HBS客户端对象

func InitHbsClient() {
    HbsClient = &SingleConnRpcClient{
        RpcServers: Config().Hbs.Servers,
        Timeout:    time.Duration(Config().Hbs.Timeout) * time.Millisecond,
    }
}

store.InitHistoryBigMap() 初始化内存BigMap,存在采集历史数据

# 创建BigMap([256]JudgeItemMap),存放采集的监控历史数据
# [00..f] JudgeItemMap
#  .
#  .
# [f0..f] JudgeItemMap
func InitHistoryBigMap() {
    arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
    for i := 0; i < 16; i++ {
        for j := 0; j < 16; j++ {
            HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap() 
        }
    }
}


## 创建与初始化JudgeItemMap -> map[string]*SafeLinkedList
func NewJudgeItemMap() *JudgeItemMap {
    return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
}

//这是个线程不安全的大Map,需要提前初始化好
var HistoryBigMap = make(map[string]*JudgeItemMap)

//JudgeItemMap结构体
type JudgeItemMap struct {
    sync.RWMutex
    M map[string]*SafeLinkedList
}

//SafeLinkedList结构体,"container/list"
type SafeLinkedList struct {
    sync.RWMutex
    L *list.List
}

http.Start() HTTP API服务监听与处理

func init() {
    configCommonRoutes()  //组件公共API路由,可参考HBS模块
    configInfoRoutes()    //信息查询API路由 
}

func Start() {
    if !g.Config().Http.Enabled {   //开启HTTP
        return
    }

    addr := g.Config().Http.Listen  //全局配置监听端品
    if addr == "" {
        return
    }
    s := &http.Server{    
        Addr:           addr,
        MaxHeaderBytes: 1 << 30,
    }
    log.Println("http listening", addr)
    log.Fatalln(s.ListenAndServe())
}

func configInfoRoutes() {
    // e.g. /strategy/lg-dinp-docker01.bj/cpu.idle
    http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) {})

    // e.g. /expression/net.port.listen/port=22
    http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) {})

    //统计bigmap数据总长度
    http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {})
    //查看BigMap内指定的历史数据
    http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) {})

}


rpc.Start() RPC服务注册与处理


func Start() {
    if !g.Config().Rpc.Enabled {
        return
    }
    addr := g.Config().Rpc.Listen
    tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
    if err != nil {
        log.Fatalf("net.ResolveTCPAddr fail: %s", err)
    }

    listener, err := net.ListenTCP("tcp", tcpAddr)
    if err != nil {
        log.Fatalf("listen %s fail: %s", addr, err)
    } else {
        log.Println("rpc listening", addr)
    }

    rpc.Register(new(Judge))   //注册Judge

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Printf("listener.Accept occur error: %s", err)
            continue
        }
        go rpc.ServeConn(conn)
    }
}

## RPC Judge.Ping方法
type Judge int
func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error {
    return nil
}
## RPC Judge.Send方法,Transfer使用此RPC方式上传数据
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
    remain := g.Config().Remain   //最大保留多少次历史记录,由全局配置文件定义
    // 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销
    now := time.Now().Unix()
    for _, item := range items {
        exists := g.FilterMap.Exists(item.Metric)  //判断缓存filtermap是否存在匹配Metric相关策略,无相关策略将不缓存此数据
        if !exists {
            continue
        }
        pk := item.PrimaryKey() //生成HASH key
        store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)  //缓存历史数据
    }
    return nil
}

//JudgeItem数据结构
type JudgeItem struct {
        Endpoint    string            `json:"endpoint"`
        Metric        string            `json:"metric"`
        Value        float64            `json:"value"`
        Timestamp    int64            `json:"timestamp"`
        JudgeType    string            `json:"judgeType"`
        Tags        map[string]string    `json:"tags"`
    }

  • 历史数据的缓存逻辑分析
#生成MD5 Hash值
func (this *JudgeItem) PrimaryKey() string {
    return utils.Md5(utils.PK(this.Endpoint, this.Metric, this.Tags))
}

#基于HASH前两位做为索引KEY,存入HistoryBigMap且Judge计算处理
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)

#JudgeItem Map缓存和Judge计算
func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
    //JudgeItemMap.Get(HASH)找是否存在HASH KEY项列表,如果存在Push
    //数据和校验数据有效性后Judge计算;如果不存在则基于HASH key创建
    //JudgeItemMap并Push数据。
    if linkedList, exists := this.Get(key); exists {
        needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
        if needJudge {
            Judge(linkedList, val, now) //【参考Judge逻辑分析】
        }
    } else {
        NL := list.New()
        NL.PushFront(val)   //push into list
        safeList := &SafeLinkedList{L: NL}  //create safelist
        this.Set(key, safeList)    //save into JudgeItemMap[hash]
        Judge(safeList, val, now)  //【参考Judge逻辑分析】
    }
}

# @return needJudge 如果是false不需要做judge,因为新上来的数据不合法
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
    this.Lock()
    defer this.Unlock()

    sz := this.L.Len()
    if sz > 0 {
        // 新push上来的数据有可能重复了(等于以前ts),或者timestamp不对(小于以前ts),这种数据要丢掉
        if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 {
            return false
        }
    }

    this.L.PushFront(v)  //最新数据放置在列表首

    sz++
    if sz <= maxCount {
        return true
    }
   
    //达到最大保存历史数据项条数后则清除尾部
    del := sz - maxCount
    for i := 0; i < del; i++ {
        this.L.Remove(this.L.Back())   //删除列表尾记录
    }

    return true
}
  • Judge报警判断逻辑分析
# Judge入口函数
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    CheckStrategy(L, firstItem, now)   //Strategy处理
    CheckExpression(L, firstItem, now) //Expression处理
}

## 策略检测及发送事件
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
    strategyMap := g.StrategyMap.Get()
    strategies, exists := strategyMap[key]
    if !exists {
        return
    }

    for _, s := range strategies {
        // 因为key仅仅是endpoint和metric,所以得到的strategies并不一定是与当前judgeItem相关的
        // 比如lg-dinp-docker01.bj配置了两个proc.num的策略,一个name=docker,一个name=agent
        // 所以此处要排除掉一部分
        related := true
        for tagKey, tagVal := range s.Tags {
            if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal {
                related = false
                break
            }
        }

        if !related {
            continue
        }

        judgeItemWithStrategy(L, s, firstItem, now)
    }
}

### 判断采集数据,如果匹配策略计算条件则发送报警事件
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
    fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue)  //解析报警函数
    if err != nil {
        log.Printf("[ERROR] parse func %s fail: %v. strategy id: %d", strategy.Func, err, strategy.Id)
        return
    }

    historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //执行判断与计算
    if !isEnough {
        return
    }
  
   // 格式化事件信息
    event := &model.Event{
        Id:         fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
        Strategy:   &strategy,
        Endpoint:   firstItem.Endpoint,
        LeftValue:  leftValue,
        EventTime:  firstItem.Timestamp,
        PushedTags: firstItem.Tags,
    }

    sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep)   //依据执行判断结果决定发送报警事件
}

## 表达式检测及发送事件
func CheckExpression(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
    keys := buildKeysFromMetricAndTags(firstItem)
    if len(keys) == 0 {
        return
    }

    // expression可能会被多次重复处理,用此数据结构保证只被处理一次
    handledExpression := make(map[int]struct{})

    expressionMap := g.ExpressionMap.Get()
    for _, key := range keys {
        expressions, exists := expressionMap[key]  //查询是否存在采集数据相对应的表达式
        if !exists {
            continue
        }

        related := filterRelatedExpressions(expressions, firstItem)   //过滤与采集数据相关的Expression 
        for _, exp := range related {
            if _, ok := handledExpression[exp.Id]; ok {
                continue
            }
            handledExpression[exp.Id] = struct{}{}
            judgeItemWithExpression(L, exp, firstItem, now)
        }
    }
}

### 过滤与采集数据相关的Expression
func filterRelatedExpressions(expressions []*model.Expression, firstItem *model.JudgeItem) []*model.Expression {
    size := len(expressions)
    if size == 0 {
        return []*model.Expression{}
    }

    exps := make([]*model.Expression, 0, size)

    for _, exp := range expressions {

        related := true
        
        itemTagsCopy := firstItem.Tags
        // 注意:exp.Tags 中可能会有一个endpoint=xxx的tag
        if _, ok := exp.Tags["endpoint"]; ok {
            itemTagsCopy = copyItemTags(firstItem)
        }

        for tagKey, tagVal := range exp.Tags {
            if myVal, exists := itemTagsCopy[tagKey]; !exists || myVal != tagVal {
                related = false
                break
            }
        }

        if !related {
            continue
        }
        exps = append(exps, exp)    //[]exps
    }
    return exps
}

### 判断采集数据,如果匹配表达式计算条件则发送报警事件
func judgeItemWithExpression(L *SafeLinkedList, expression *model.Expression, firstItem *model.JudgeItem, now int64) {
    fn, err := ParseFuncFromString(expression.Func, expression.Operator, expression.RightValue)  //解析报警函数
    if err != nil {
        log.Printf("[ERROR] parse func %s fail: %v. expression id: %d", expression.Func, err, expression.Id)
        return
    }

    historyData, leftValue, isTriggered, isEnough := fn.Compute(L)  //执行判断与计算
    if !isEnough {
        return
    }

    // 格式化事件信息
    event := &model.Event{
        Id:         fmt.Sprintf("e_%d_%s", expression.Id, firstItem.PrimaryKey()),
        Expression: expression,
        Endpoint:   firstItem.Endpoint,
        LeftValue:  leftValue,
        EventTime:  firstItem.Timestamp,
        PushedTags: firstItem.Tags,
    }

    sendEventIfNeed(historyData, isTriggered, now, event, expression.MaxStep) //依据执行判断结果决定发送报警事件

}

## 解析操作字符串,转化为报警函数
func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) {
    if str == "" {
        return nil, fmt.Errorf("func can not be null!")
    }
    idx := strings.Index(str, "#") //以#为定位符
    args, err := atois(str[idx+1 : len(str)-1])  //字位符后为参数
    if err != nil {
        return nil, err
    }

    switch str[:idx-1] {   //定位符前为函数名
    case "max":
        fn = &MaxFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "min":
        fn = &MinFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "all":
        fn = &AllFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "sum":
        fn = &SumFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "avg":
        fn = &AvgFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "diff":
        fn = &DiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "pdiff":
        fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
    case "lookup":
        fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue}
    default:
        err = fmt.Errorf("not_supported_method")
    }
    return
}

## 判断与发送事件
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
    lastEvent, exists := g.LastEvents.Get(event.Id)
    if isTriggered {
        event.Status = "PROBLEM"
        if !exists || lastEvent.Status[0] == 'O' {
            // 本次触发了阈值,之前又没报过警,得产生一个报警Event
            event.CurrentStep = 1

            // 但是有些用户把最大报警次数配置成了0,相当于屏蔽了,要检查一下
            if maxStep == 0 {
                return
            }

            sendEvent(event)  //发送事件
            return
        }

        // 逻辑走到这里,说明之前Event是PROBLEM状态
        if lastEvent.CurrentStep >= maxStep {
            // 报警次数已经足够多,到达了最多报警次数了,不再报警
            return
        }

        if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime {
            // 产生过报警的点,就不能再使用来判断了,否则容易出现一分钟报一次的情况
            // 只需要拿最后一个historyData来做判断即可,因为它的时间最老
            return
        }

        if now-lastEvent.EventTime < g.Config().Alarm.MinInterval {
            // 报警不能太频繁,两次报警之间至少要间隔MinInterval秒,否则就不能报警
            return
        }

        event.CurrentStep = lastEvent.CurrentStep + 1
        sendEvent(event)
    } else {
        // 如果LastEvent是Problem,报OK,否则啥都不做
        if exists && lastEvent.Status[0] == 'P' {
            event.Status = "OK"          //状态转OK
            event.CurrentStep = 1
            sendEvent(event)  //发送事件
        }
    }
}

### sendEvent将事件保存至Redis(预警的异步机制)
func sendEvent(event *model.Event) {
    // update last event
    g.LastEvents.Set(event.Id, event)  //事件缓存

    bs, err := json.Marshal(event)     //Json序列化事件
    if err != nil {
        log.Printf("json marshal event %v fail: %v", event, err)
        return
    }

    // send to redis
    redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority())                        //redis键名
    rc := g.RedisConnPool.Get()
    defer rc.Close()
    rc.Do("LPUSH", redisKey, string(bs))  //LPUSH存储
}

  • 报警函数分析
# Max
# 例如: max(#3)
#    对于最新的3个点,其最大值满足阈值条件则报警
type MaxFunction struct {
    Function
    Limit      int     //点数
    Operator   string  //操作符
    RightValue float64 //阀值 
}
func (this MaxFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit) //取指定的历史数据
    if !isEnough {
        return
    }
 
    max := vs[0].Value
    //取最大值 
    for i := 1; i < this.Limit; i++ {
        if max < vs[i].Value {
            max = vs[i].Value
        }
    }

    leftValue = max
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
    return
}

# Min
# 如: min(#3)
#     对于最新的3个点,其最小值满足阈值条件则报警
type MinFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this MinFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    min := vs[0].Value
    //取最小值
    for i := 1; i < this.Limit; i++ {
        if min > vs[i].Value {
            min = vs[i].Value
        }
    }

    leftValue = min
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
    return
}

# All
# 如:all(#3)
#    最新的3个点都满足阈值条件则报警
type AllFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this AllFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    isTriggered = true
    // 遁环判断操作条件
    for i := 0; i < this.Limit; i++ {
        isTriggered = checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) //操作符判断返回true|false
        if !isTriggered {
            break
        }
    }

    leftValue = vs[0].Value
    return
}

# Lookup
# 如 lookup(#2,3)
#    最新的3个点中有2个满足条件则报警
type LookupFunction struct {
    Function
    Num        int   //条件数2
    Limit      int   //点数3
    Operator   string
    RightValue float64
}
func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    leftValue = vs[0].Value

    for n, i := 0, 0; i < this.Limit; i++ {
        if checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) {
            n++           //满足条件则累计
            if n == this.Num {     //达到条件则触发
                isTriggered = true
                return
            }
        }
    }

    return
}

# Sum
# 如 sum(#3)
#    对于最新的3个点,其和满足阈值条件则报警
type SumFunction struct {
    Function
    Limit      int  
    Operator   string
    RightValue float64
}
func (this SumFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    sum := 0.0
    for i := 0; i < this.Limit; i++ {
        sum += vs[i].Value     //累计和
    }

    leftValue = sum
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)    //操作符判断返回true|false
    return 
}

# Avg
# 如 avg(#3)
#    对于最新的3个点,其平均值满足阈值条件则报警
type AvgFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this AvgFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit)
    if !isEnough {
        return
    }

    sum := 0.0
    for i := 0; i < this.Limit; i++ {
        sum += vs[i].Value              //累计和
    }

    leftValue = sum / float64(this.Limit)  //求平均
    isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)     //操作符判断返回true|false
    return
}

# Diff
# 如 diff(#3)
#  拿最新push上来的点(被减数),与历史最新的3个点(3个减数)相减,得到3个差
#  只要有一个差满足阈值条件则报警
type DiffFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
// 只要有一个点的diff触发阈值,就报警
func (this DiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    // 此处this.Limit要+1,因为通常说diff(#3),是当前点与历史的3个点相比较
    // 然而最新点已经在linkedlist的第一个位置,所以……
    vs, isEnough = L.HistoryData(this.Limit + 1)
    if !isEnough {
        return
    }

    if len(vs) == 0 {
        isEnough = false
        return
    }

    first := vs[0].Value  //最新值

    isTriggered = false
    for i := 1; i < this.Limit+1; i++ {
        // diff是当前值减去历史值
        leftValue = first - vs[i].Value
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)  
        if isTriggered {
            break   //只要任何一次满足判断条件则返回True触发
        }
    }
    return
}

# Pdiff 
# 如:pdiff(#3)
#    拿最新push上来的点,与历史最新的3个点相减,得到3个差
#    再将3个差值分别除以减数,得到3个商值,只要有一个商值满足阈值则报警
type PDiffFunction struct {
    Function
    Limit      int
    Operator   string
    RightValue float64
}
func (this PDiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
    vs, isEnough = L.HistoryData(this.Limit + 1)
    if !isEnough {
        return
    }

    if len(vs) == 0 {
        isEnough = false
        return
    }

    first := vs[0].Value

    isTriggered = false
    for i := 1; i < this.Limit+1; i++ {
        if vs[i].Value == 0 {
            continue
        }
        
        // 差/Value*100
        leftValue = (first - vs[i].Value) / vs[i].Value * 100.0
        isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
        if isTriggered {
            break
        }
    }
    return
}

# 操作符解析与判断
func checkIsTriggered(leftValue float64, operator string, rightValue float64) (isTriggered bool) {
    switch operator {
    case "=", "==":
        isTriggered = math.Abs(leftValue-rightValue) < 0.0001
    case "!=":
        isTriggered = math.Abs(leftValue-rightValue) > 0.0001
    case "<":
        isTriggered = leftValue < rightValue
    case "<=":
        isTriggered = leftValue <= rightValue
    case ">":
        isTriggered = leftValue > rightValue
    case ">=":
        isTriggered = leftValue >= rightValue
    }
    return
}

cron.SyncStrategies() 同步HBS策略和表达式


# 同步策略配置入口函数
func SyncStrategies() {
    duration := time.Duration(g.Config().Hbs.Interval) * time.Second   //全局配置间隔
    for {
        syncStrategies()   //同步策略项配置函数调用
        syncExpression()   //同步表达式配置函数调用
        syncFilter()       //同步过滤器配置函数调用
        time.Sleep(duration)  //同步间隔
    }
}


## RPC调用"Hbs.GetStrategies"HBS同步策略
func syncStrategies() {
    var strategiesResponse model.StrategiesResponse
    err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) //RPC调用HBS
    if err != nil {
        log.Println("[ERROR] Hbs.GetStrategies:", err)
        return
    }
    rebuildStrategyMap(&strategiesResponse) //缓存
}
### 归整策略数据和缓存
func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) {
    //缓存MAP格式 Key    =>  Value
    //            ||         ||          
    //  endpoint/metric => [strategy1, strategy2 ...]
    m := make(map[string][]model.Strategy)
    for _, hs := range strategiesResponse.HostStrategies {
        hostname := hs.Hostname
        //debug打印
        if g.Config().Debug && hostname == g.Config().DebugHost {
            log.Println(hostname, "strategies:")
            bs, _ := json.Marshal(hs.Strategies)
            fmt.Println(string(bs))
        }
        //数据归整至Map
        for _, strategy := range hs.Strategies {
            key := fmt.Sprintf("%s/%s", hostname, strategy.Metric) 
            if _, exists := m[key]; exists {
                m[key] = append(m[key], strategy)
            } else {
                m[key] = []model.Strategy{strategy}
            }
        }
    }

    g.StrategyMap.ReInit(m)  //初始化全局变量
}


## RPC调用"Hbs.GetExpressions"HBS同步表达式
func syncExpression() {
    var expressionResponse model.ExpressionResponse
    err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse)  //RPC调用HBS
    if err != nil {
        log.Println("[ERROR] Hbs.GetExpressions:", err)
        return
    }

    rebuildExpressionMap(&expressionResponse)  //缓存
}
### 归整表达式数据和缓存

func rebuildExpressionMap(expressionResponse *model.ExpressionResponse) {
    m := make(map[string][]*model.Expression)
         //缓存MAP格式 Key    =>    Value
         //          ||            ||          
         //      metric/k=v  => [expression1, expression2 ...]
    for _, exp := range expressionResponse.Expressions {
        for k, v := range exp.Tags {
            key := fmt.Sprintf("%s/%s=%s", exp.Metric, k, v)
            if _, exists := m[key]; exists {
                m[key] = append(m[key], exp)
            } else {
                m[key] = []*model.Expression{exp}
            }
        }
    }

    g.ExpressionMap.ReInit(m)  //初始化全局变量
}


## 构建同步过滤器map,以Metric为查询Key
func syncFilter() {
    m := make(map[string]string)  //缓存map

    //M map[string][]model.Strategy
    strategyMap := g.StrategyMap.Get() //获取同步的strategyMap
    for _, strategies := range strategyMap {
        for _, strategy := range strategies {
            m[strategy.Metric] = strategy.Metric
        }
    }  //迭代Metric

    //M map[string][]*model.Expression 
    expressionMap := g.ExpressionMap.Get() //获取同步的expressionMap
    for _, expressions := range expressionMap {
        for _, expression := range expressions {
            m[expression.Metric] = expression.Metric
        }
    }  //迭代Metric
    g.FilterMap.ReInit(m)   //初始化全局变量
}

#### 全局(StrategyMap、ExpressionMap、FilterMap)变量和缓存初始化
var (
    StrategyMap   = &SafeStrategyMap{M: make(map[string][]model.Strategy)}
    ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)}
    FilterMap     = &SafeFilterMap{M: make(map[string]string)}
)

func (this *SafeStrategyMap) ReInit(m map[string][]model.Strategy) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}

func (this *SafeExpressionMap) ReInit(m map[string][]*model.Expression) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}

func (this *SafeFilterMap) ReInit(m map[string]string) {
    this.Lock()
    defer this.Unlock()
    this.M = m
}


####策略结构体定义
type Strategy struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    Tpl        *Template         `json:"tpl"`        //模版
}

####表达式结构体定义
type Expression struct {
    Id         int               `json:"id"`
    Metric     string            `json:"metric"`
    Tags       map[string]string `json:"tags"`
    Func       string            `json:"func"`       // e.g. max(#3) all(#3)
    Operator   string            `json:"operator"`   // e.g. < !=
    RightValue float64           `json:"rightValue"` // critical value
    MaxStep    int               `json:"maxStep"`
    Priority   int               `json:"priority"`
    Note       string            `json:"note"`
    ActionId   int               `json:"actionId"`   //执行动作ID
}

cron.CleanStale()

# 定期清理任务运行入口
func CleanStale() {
    for {
        time.Sleep(time.Hour * 5)
        cleanStale()  //调用清理
    }
}

##清理7天之前的历史过期数据
func cleanStale() {
    before := time.Now().Unix() - 3600*24*7

    arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
    for i := 0; i < 16; i++ {
        for j := 0; j < 16; j++ {
            store.HistoryBigMap[arr[i]+arr[j]].CleanStale(before) //清理BigMap数据
        }
    }
}

#清理实现
func (this *JudgeItemMap) CleanStale(before int64) {
    keys := []string{}

    this.RLock()
    for key, L := range this.M {
        front := L.Front()
        if front == nil {
            continue
        }
        //迭代匹配时间戳,小于则过期
        if front.Value.(*model.JudgeItem).Timestamp < before {
            keys = append(keys, key)
        }
    }
    this.RUnlock()
   
    //批量清理
    this.BatchDelete(keys)
}

func (this *JudgeItemMap) BatchDelete(keys []string) {
    count := len(keys)
    if count == 0 {
        return
    }

    this.Lock()
    defer this.Unlock()
    for i := 0; i < count; i++ {
        delete(this.M, keys[i])  //map delete条目
    }
}


技术经验借鉴

  • BigMap内存构造缓存历史数据机制与应用
  • 针对报警计算funcation的设计模式之 "策略模式"应用
  • SafeLinkedList并发安全的链表LIST操作实现

扩展学习

相关文章

网友评论

      本文标题:OpenFalcon源码分析(Judge组件)

      本文链接:https://www.haomeiwen.com/subject/bxpmnftx.html