Judge版本
VERSION = "2.0.2"
Judge组件功能
Judge是用于判断是否触发报警条件的组件。
Transfer的数据不但要转送到Graph来存储并绘图,还要转送到Judge用于报警判断。Judge先从hbs获取所有策略列表,静等Transfer的数据转发。
每收到一条Transfer转发过来的数据,立即找到这条数据关联的Strategy、Expression,然后做阈值判断。【官方描述】
Judge逻辑图Judge组件逻辑图
Portal关于报警策略与表达式定义操作说明书
main主入口分析
func main() {
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
flag.Parse()
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
g.ParseConfig(*cfg) //全局配置文件解析
g.InitRedisConnPool() //初始化Redis连接池 【参考详细分析】
g.InitHbsClient() //初始化HBS客户端连接【参考详细分析】
store.InitHistoryBigMap() //BigMap缓存指定采集数据 【参考详细分析】
go http.Start() //Http API服务启动 【参考详细分析】
go rpc.Start() //RPC服务启动 【参考详细分析】
go cron.SyncStrategies() //周期任务,同步HBS策略和表达式 【参考详细分析】
go cron.CleanStale() //周期任务,清理过时策略 【参考详细分析】
select {}
}
g.ParseConfig(*cfg) 初始化全局配置文件
type GlobalConfig struct {
Debug bool `json:"debug"`
DebugHost string `json:"debugHost"`
Remain int `json:"remain"`
Http *HttpConfig `json:"http"`
Rpc *RpcConfig `json:"rpc"`
Hbs *HbsConfig `json:"hbs"`
Alarm *AlarmConfig `json:"alarm"`
}
func Config() *GlobalConfig {
configLock.RLock()
defer configLock.RUnlock()
return config
}
# 解析全局配置
func ParseConfig(cfg string) {
if cfg == "" { //参数配置
log.Fatalln("use -c to specify configuration file")
}
if !file.IsExist(cfg) { //是否存在
log.Fatalln("config file:", cfg, "is not existent")
}
ConfigFile = cfg
configContent, err := file.ToTrimString(cfg) //字符串
if err != nil {
log.Fatalln("read config file:", cfg, "fail:", err)
}
var c GlobalConfig
err = json.Unmarshal([]byte(configContent), &c) //反序列为结构
if err != nil {
log.Fatalln("parse config file:", cfg, "fail:", err)
}
configLock.Lock()
defer configLock.Unlock()
config = &c
log.Println("read config file:", cfg, "successfully")
}
g.InitRedisConnPool() 初始化Redis连接池
func InitRedisConnPool() {
if !Config().Alarm.Enabled {
return
}
dsn := Config().Alarm.Redis.Dsn
maxIdle := Config().Alarm.Redis.MaxIdle
idleTimeout := 240 * time.Second
connTimeout := time.Duration(Config().Alarm.Redis.ConnTimeout) * time.Millisecond
readTimeout := time.Duration(Config().Alarm.Redis.ReadTimeout) * time.Millisecond
writeTimeout := time.Duration(Config().Alarm.Redis.WriteTimeout) * time.Millisecond
RedisConnPool = &redis.Pool{
MaxIdle: maxIdle,
IdleTimeout: idleTimeout,
Dial: func() (redis.Conn, error) {
c, err := redis.DialTimeout("tcp", dsn, connTimeout, readTimeout, writeTimeout)
if err != nil {
return nil, err
}
return c, err
},
TestOnBorrow: PingRedis,
}
}
func PingRedis(c redis.Conn, t time.Time) error {
_, err := c.Do("ping")
if err != nil {
log.Println("[ERROR] ping redis fail", err)
}
return err
}
g.InitHbsClient() 实例化HBS客户端对象
func InitHbsClient() {
HbsClient = &SingleConnRpcClient{
RpcServers: Config().Hbs.Servers,
Timeout: time.Duration(Config().Hbs.Timeout) * time.Millisecond,
}
}
store.InitHistoryBigMap() 初始化内存BigMap,存在采集历史数据
# 创建BigMap([256]JudgeItemMap),存放采集的监控历史数据
# [00..f] JudgeItemMap
# .
# .
# [f0..f] JudgeItemMap
func InitHistoryBigMap() {
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
HistoryBigMap[arr[i]+arr[j]] = NewJudgeItemMap()
}
}
}
## 创建与初始化JudgeItemMap -> map[string]*SafeLinkedList
func NewJudgeItemMap() *JudgeItemMap {
return &JudgeItemMap{M: make(map[string]*SafeLinkedList)}
}
//这是个线程不安全的大Map,需要提前初始化好
var HistoryBigMap = make(map[string]*JudgeItemMap)
//JudgeItemMap结构体
type JudgeItemMap struct {
sync.RWMutex
M map[string]*SafeLinkedList
}
//SafeLinkedList结构体,"container/list"
type SafeLinkedList struct {
sync.RWMutex
L *list.List
}
http.Start() HTTP API服务监听与处理
func init() {
configCommonRoutes() //组件公共API路由,可参考HBS模块
configInfoRoutes() //信息查询API路由
}
func Start() {
if !g.Config().Http.Enabled { //开启HTTP
return
}
addr := g.Config().Http.Listen //全局配置监听端品
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("http listening", addr)
log.Fatalln(s.ListenAndServe())
}
func configInfoRoutes() {
// e.g. /strategy/lg-dinp-docker01.bj/cpu.idle
http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) {})
// e.g. /expression/net.port.listen/port=22
http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) {})
//统计bigmap数据总长度
http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) {})
//查看BigMap内指定的历史数据
http.HandleFunc("/history/", func(w http.ResponseWriter, r *http.Request) {})
}
rpc.Start() RPC服务注册与处理
func Start() {
if !g.Config().Rpc.Enabled {
return
}
addr := g.Config().Rpc.Listen
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log.Fatalf("net.ResolveTCPAddr fail: %s", err)
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Fatalf("listen %s fail: %s", addr, err)
} else {
log.Println("rpc listening", addr)
}
rpc.Register(new(Judge)) //注册Judge
for {
conn, err := listener.Accept()
if err != nil {
log.Printf("listener.Accept occur error: %s", err)
continue
}
go rpc.ServeConn(conn)
}
}
## RPC Judge.Ping方法
type Judge int
func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error {
return nil
}
## RPC Judge.Send方法,Transfer使用此RPC方式上传数据
func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error {
remain := g.Config().Remain //最大保留多少次历史记录,由全局配置文件定义
// 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销
now := time.Now().Unix()
for _, item := range items {
exists := g.FilterMap.Exists(item.Metric) //判断缓存filtermap是否存在匹配Metric相关策略,无相关策略将不缓存此数据
if !exists {
continue
}
pk := item.PrimaryKey() //生成HASH key
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) //缓存历史数据
}
return nil
}
//JudgeItem数据结构
type JudgeItem struct {
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Value float64 `json:"value"`
Timestamp int64 `json:"timestamp"`
JudgeType string `json:"judgeType"`
Tags map[string]string `json:"tags"`
}
- 历史数据的缓存逻辑分析
#生成MD5 Hash值
func (this *JudgeItem) PrimaryKey() string {
return utils.Md5(utils.PK(this.Endpoint, this.Metric, this.Tags))
}
#基于HASH前两位做为索引KEY,存入HistoryBigMap且Judge计算处理
store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now)
#JudgeItem Map缓存和Judge计算
func (this *JudgeItemMap) PushFrontAndMaintain(key string, val *model.JudgeItem, maxCount int, now int64) {
//JudgeItemMap.Get(HASH)找是否存在HASH KEY项列表,如果存在Push
//数据和校验数据有效性后Judge计算;如果不存在则基于HASH key创建
//JudgeItemMap并Push数据。
if linkedList, exists := this.Get(key); exists {
needJudge := linkedList.PushFrontAndMaintain(val, maxCount)
if needJudge {
Judge(linkedList, val, now) //【参考Judge逻辑分析】
}
} else {
NL := list.New()
NL.PushFront(val) //push into list
safeList := &SafeLinkedList{L: NL} //create safelist
this.Set(key, safeList) //save into JudgeItemMap[hash]
Judge(safeList, val, now) //【参考Judge逻辑分析】
}
}
# @return needJudge 如果是false不需要做judge,因为新上来的数据不合法
func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount int) bool {
this.Lock()
defer this.Unlock()
sz := this.L.Len()
if sz > 0 {
// 新push上来的数据有可能重复了(等于以前ts),或者timestamp不对(小于以前ts),这种数据要丢掉
if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 {
return false
}
}
this.L.PushFront(v) //最新数据放置在列表首
sz++
if sz <= maxCount {
return true
}
//达到最大保存历史数据项条数后则清除尾部
del := sz - maxCount
for i := 0; i < del; i++ {
this.L.Remove(this.L.Back()) //删除列表尾记录
}
return true
}
- Judge报警判断逻辑分析
# Judge入口函数
func Judge(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
CheckStrategy(L, firstItem, now) //Strategy处理
CheckExpression(L, firstItem, now) //Expression处理
}
## 策略检测及发送事件
func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
key := fmt.Sprintf("%s/%s", firstItem.Endpoint, firstItem.Metric)
strategyMap := g.StrategyMap.Get()
strategies, exists := strategyMap[key]
if !exists {
return
}
for _, s := range strategies {
// 因为key仅仅是endpoint和metric,所以得到的strategies并不一定是与当前judgeItem相关的
// 比如lg-dinp-docker01.bj配置了两个proc.num的策略,一个name=docker,一个name=agent
// 所以此处要排除掉一部分
related := true
for tagKey, tagVal := range s.Tags {
if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal {
related = false
break
}
}
if !related {
continue
}
judgeItemWithStrategy(L, s, firstItem, now)
}
}
### 判断采集数据,如果匹配策略计算条件则发送报警事件
func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem *model.JudgeItem, now int64) {
fn, err := ParseFuncFromString(strategy.Func, strategy.Operator, strategy.RightValue) //解析报警函数
if err != nil {
log.Printf("[ERROR] parse func %s fail: %v. strategy id: %d", strategy.Func, err, strategy.Id)
return
}
historyData, leftValue, isTriggered, isEnough := fn.Compute(L) //执行判断与计算
if !isEnough {
return
}
// 格式化事件信息
event := &model.Event{
Id: fmt.Sprintf("s_%d_%s", strategy.Id, firstItem.PrimaryKey()),
Strategy: &strategy,
Endpoint: firstItem.Endpoint,
LeftValue: leftValue,
EventTime: firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
sendEventIfNeed(historyData, isTriggered, now, event, strategy.MaxStep) //依据执行判断结果决定发送报警事件
}
## 表达式检测及发送事件
func CheckExpression(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) {
keys := buildKeysFromMetricAndTags(firstItem)
if len(keys) == 0 {
return
}
// expression可能会被多次重复处理,用此数据结构保证只被处理一次
handledExpression := make(map[int]struct{})
expressionMap := g.ExpressionMap.Get()
for _, key := range keys {
expressions, exists := expressionMap[key] //查询是否存在采集数据相对应的表达式
if !exists {
continue
}
related := filterRelatedExpressions(expressions, firstItem) //过滤与采集数据相关的Expression
for _, exp := range related {
if _, ok := handledExpression[exp.Id]; ok {
continue
}
handledExpression[exp.Id] = struct{}{}
judgeItemWithExpression(L, exp, firstItem, now)
}
}
}
### 过滤与采集数据相关的Expression
func filterRelatedExpressions(expressions []*model.Expression, firstItem *model.JudgeItem) []*model.Expression {
size := len(expressions)
if size == 0 {
return []*model.Expression{}
}
exps := make([]*model.Expression, 0, size)
for _, exp := range expressions {
related := true
itemTagsCopy := firstItem.Tags
// 注意:exp.Tags 中可能会有一个endpoint=xxx的tag
if _, ok := exp.Tags["endpoint"]; ok {
itemTagsCopy = copyItemTags(firstItem)
}
for tagKey, tagVal := range exp.Tags {
if myVal, exists := itemTagsCopy[tagKey]; !exists || myVal != tagVal {
related = false
break
}
}
if !related {
continue
}
exps = append(exps, exp) //[]exps
}
return exps
}
### 判断采集数据,如果匹配表达式计算条件则发送报警事件
func judgeItemWithExpression(L *SafeLinkedList, expression *model.Expression, firstItem *model.JudgeItem, now int64) {
fn, err := ParseFuncFromString(expression.Func, expression.Operator, expression.RightValue) //解析报警函数
if err != nil {
log.Printf("[ERROR] parse func %s fail: %v. expression id: %d", expression.Func, err, expression.Id)
return
}
historyData, leftValue, isTriggered, isEnough := fn.Compute(L) //执行判断与计算
if !isEnough {
return
}
// 格式化事件信息
event := &model.Event{
Id: fmt.Sprintf("e_%d_%s", expression.Id, firstItem.PrimaryKey()),
Expression: expression,
Endpoint: firstItem.Endpoint,
LeftValue: leftValue,
EventTime: firstItem.Timestamp,
PushedTags: firstItem.Tags,
}
sendEventIfNeed(historyData, isTriggered, now, event, expression.MaxStep) //依据执行判断结果决定发送报警事件
}
## 解析操作字符串,转化为报警函数
func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) {
if str == "" {
return nil, fmt.Errorf("func can not be null!")
}
idx := strings.Index(str, "#") //以#为定位符
args, err := atois(str[idx+1 : len(str)-1]) //字位符后为参数
if err != nil {
return nil, err
}
switch str[:idx-1] { //定位符前为函数名
case "max":
fn = &MaxFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "min":
fn = &MinFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "all":
fn = &AllFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "sum":
fn = &SumFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "avg":
fn = &AvgFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "diff":
fn = &DiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "pdiff":
fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue}
case "lookup":
fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue}
default:
err = fmt.Errorf("not_supported_method")
}
return
}
## 判断与发送事件
func sendEventIfNeed(historyData []*model.HistoryData, isTriggered bool, now int64, event *model.Event, maxStep int) {
lastEvent, exists := g.LastEvents.Get(event.Id)
if isTriggered {
event.Status = "PROBLEM"
if !exists || lastEvent.Status[0] == 'O' {
// 本次触发了阈值,之前又没报过警,得产生一个报警Event
event.CurrentStep = 1
// 但是有些用户把最大报警次数配置成了0,相当于屏蔽了,要检查一下
if maxStep == 0 {
return
}
sendEvent(event) //发送事件
return
}
// 逻辑走到这里,说明之前Event是PROBLEM状态
if lastEvent.CurrentStep >= maxStep {
// 报警次数已经足够多,到达了最多报警次数了,不再报警
return
}
if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime {
// 产生过报警的点,就不能再使用来判断了,否则容易出现一分钟报一次的情况
// 只需要拿最后一个historyData来做判断即可,因为它的时间最老
return
}
if now-lastEvent.EventTime < g.Config().Alarm.MinInterval {
// 报警不能太频繁,两次报警之间至少要间隔MinInterval秒,否则就不能报警
return
}
event.CurrentStep = lastEvent.CurrentStep + 1
sendEvent(event)
} else {
// 如果LastEvent是Problem,报OK,否则啥都不做
if exists && lastEvent.Status[0] == 'P' {
event.Status = "OK" //状态转OK
event.CurrentStep = 1
sendEvent(event) //发送事件
}
}
}
### sendEvent将事件保存至Redis(预警的异步机制)
func sendEvent(event *model.Event) {
// update last event
g.LastEvents.Set(event.Id, event) //事件缓存
bs, err := json.Marshal(event) //Json序列化事件
if err != nil {
log.Printf("json marshal event %v fail: %v", event, err)
return
}
// send to redis
redisKey := fmt.Sprintf(g.Config().Alarm.QueuePattern, event.Priority()) //redis键名
rc := g.RedisConnPool.Get()
defer rc.Close()
rc.Do("LPUSH", redisKey, string(bs)) //LPUSH存储
}
- 报警函数分析
# Max
# 例如: max(#3)
# 对于最新的3个点,其最大值满足阈值条件则报警
type MaxFunction struct {
Function
Limit int //点数
Operator string //操作符
RightValue float64 //阀值
}
func (this MaxFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit) //取指定的历史数据
if !isEnough {
return
}
max := vs[0].Value
//取最大值
for i := 1; i < this.Limit; i++ {
if max < vs[i].Value {
max = vs[i].Value
}
}
leftValue = max
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
return
}
# Min
# 如: min(#3)
# 对于最新的3个点,其最小值满足阈值条件则报警
type MinFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this MinFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
min := vs[0].Value
//取最小值
for i := 1; i < this.Limit; i++ {
if min > vs[i].Value {
min = vs[i].Value
}
}
leftValue = min
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
return
}
# All
# 如:all(#3)
# 最新的3个点都满足阈值条件则报警
type AllFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this AllFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
isTriggered = true
// 遁环判断操作条件
for i := 0; i < this.Limit; i++ {
isTriggered = checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) //操作符判断返回true|false
if !isTriggered {
break
}
}
leftValue = vs[0].Value
return
}
# Lookup
# 如 lookup(#2,3)
# 最新的3个点中有2个满足条件则报警
type LookupFunction struct {
Function
Num int //条件数2
Limit int //点数3
Operator string
RightValue float64
}
func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
leftValue = vs[0].Value
for n, i := 0, 0; i < this.Limit; i++ {
if checkIsTriggered(vs[i].Value, this.Operator, this.RightValue) {
n++ //满足条件则累计
if n == this.Num { //达到条件则触发
isTriggered = true
return
}
}
}
return
}
# Sum
# 如 sum(#3)
# 对于最新的3个点,其和满足阈值条件则报警
type SumFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this SumFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
sum := 0.0
for i := 0; i < this.Limit; i++ {
sum += vs[i].Value //累计和
}
leftValue = sum
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
return
}
# Avg
# 如 avg(#3)
# 对于最新的3个点,其平均值满足阈值条件则报警
type AvgFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this AvgFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit)
if !isEnough {
return
}
sum := 0.0
for i := 0; i < this.Limit; i++ {
sum += vs[i].Value //累计和
}
leftValue = sum / float64(this.Limit) //求平均
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) //操作符判断返回true|false
return
}
# Diff
# 如 diff(#3)
# 拿最新push上来的点(被减数),与历史最新的3个点(3个减数)相减,得到3个差
# 只要有一个差满足阈值条件则报警
type DiffFunction struct {
Function
Limit int
Operator string
RightValue float64
}
// 只要有一个点的diff触发阈值,就报警
func (this DiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
// 此处this.Limit要+1,因为通常说diff(#3),是当前点与历史的3个点相比较
// 然而最新点已经在linkedlist的第一个位置,所以……
vs, isEnough = L.HistoryData(this.Limit + 1)
if !isEnough {
return
}
if len(vs) == 0 {
isEnough = false
return
}
first := vs[0].Value //最新值
isTriggered = false
for i := 1; i < this.Limit+1; i++ {
// diff是当前值减去历史值
leftValue = first - vs[i].Value
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
if isTriggered {
break //只要任何一次满足判断条件则返回True触发
}
}
return
}
# Pdiff
# 如:pdiff(#3)
# 拿最新push上来的点,与历史最新的3个点相减,得到3个差
# 再将3个差值分别除以减数,得到3个商值,只要有一个商值满足阈值则报警
type PDiffFunction struct {
Function
Limit int
Operator string
RightValue float64
}
func (this PDiffFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) {
vs, isEnough = L.HistoryData(this.Limit + 1)
if !isEnough {
return
}
if len(vs) == 0 {
isEnough = false
return
}
first := vs[0].Value
isTriggered = false
for i := 1; i < this.Limit+1; i++ {
if vs[i].Value == 0 {
continue
}
// 差/Value*100
leftValue = (first - vs[i].Value) / vs[i].Value * 100.0
isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue)
if isTriggered {
break
}
}
return
}
# 操作符解析与判断
func checkIsTriggered(leftValue float64, operator string, rightValue float64) (isTriggered bool) {
switch operator {
case "=", "==":
isTriggered = math.Abs(leftValue-rightValue) < 0.0001
case "!=":
isTriggered = math.Abs(leftValue-rightValue) > 0.0001
case "<":
isTriggered = leftValue < rightValue
case "<=":
isTriggered = leftValue <= rightValue
case ">":
isTriggered = leftValue > rightValue
case ">=":
isTriggered = leftValue >= rightValue
}
return
}
cron.SyncStrategies() 同步HBS策略和表达式
# 同步策略配置入口函数
func SyncStrategies() {
duration := time.Duration(g.Config().Hbs.Interval) * time.Second //全局配置间隔
for {
syncStrategies() //同步策略项配置函数调用
syncExpression() //同步表达式配置函数调用
syncFilter() //同步过滤器配置函数调用
time.Sleep(duration) //同步间隔
}
}
## RPC调用"Hbs.GetStrategies"HBS同步策略
func syncStrategies() {
var strategiesResponse model.StrategiesResponse
err := g.HbsClient.Call("Hbs.GetStrategies", model.NullRpcRequest{}, &strategiesResponse) //RPC调用HBS
if err != nil {
log.Println("[ERROR] Hbs.GetStrategies:", err)
return
}
rebuildStrategyMap(&strategiesResponse) //缓存
}
### 归整策略数据和缓存
func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) {
//缓存MAP格式 Key => Value
// || ||
// endpoint/metric => [strategy1, strategy2 ...]
m := make(map[string][]model.Strategy)
for _, hs := range strategiesResponse.HostStrategies {
hostname := hs.Hostname
//debug打印
if g.Config().Debug && hostname == g.Config().DebugHost {
log.Println(hostname, "strategies:")
bs, _ := json.Marshal(hs.Strategies)
fmt.Println(string(bs))
}
//数据归整至Map
for _, strategy := range hs.Strategies {
key := fmt.Sprintf("%s/%s", hostname, strategy.Metric)
if _, exists := m[key]; exists {
m[key] = append(m[key], strategy)
} else {
m[key] = []model.Strategy{strategy}
}
}
}
g.StrategyMap.ReInit(m) //初始化全局变量
}
## RPC调用"Hbs.GetExpressions"HBS同步表达式
func syncExpression() {
var expressionResponse model.ExpressionResponse
err := g.HbsClient.Call("Hbs.GetExpressions", model.NullRpcRequest{}, &expressionResponse) //RPC调用HBS
if err != nil {
log.Println("[ERROR] Hbs.GetExpressions:", err)
return
}
rebuildExpressionMap(&expressionResponse) //缓存
}
### 归整表达式数据和缓存
func rebuildExpressionMap(expressionResponse *model.ExpressionResponse) {
m := make(map[string][]*model.Expression)
//缓存MAP格式 Key => Value
// || ||
// metric/k=v => [expression1, expression2 ...]
for _, exp := range expressionResponse.Expressions {
for k, v := range exp.Tags {
key := fmt.Sprintf("%s/%s=%s", exp.Metric, k, v)
if _, exists := m[key]; exists {
m[key] = append(m[key], exp)
} else {
m[key] = []*model.Expression{exp}
}
}
}
g.ExpressionMap.ReInit(m) //初始化全局变量
}
## 构建同步过滤器map,以Metric为查询Key
func syncFilter() {
m := make(map[string]string) //缓存map
//M map[string][]model.Strategy
strategyMap := g.StrategyMap.Get() //获取同步的strategyMap
for _, strategies := range strategyMap {
for _, strategy := range strategies {
m[strategy.Metric] = strategy.Metric
}
} //迭代Metric
//M map[string][]*model.Expression
expressionMap := g.ExpressionMap.Get() //获取同步的expressionMap
for _, expressions := range expressionMap {
for _, expression := range expressions {
m[expression.Metric] = expression.Metric
}
} //迭代Metric
g.FilterMap.ReInit(m) //初始化全局变量
}
#### 全局(StrategyMap、ExpressionMap、FilterMap)变量和缓存初始化
var (
StrategyMap = &SafeStrategyMap{M: make(map[string][]model.Strategy)}
ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)}
FilterMap = &SafeFilterMap{M: make(map[string]string)}
)
func (this *SafeStrategyMap) ReInit(m map[string][]model.Strategy) {
this.Lock()
defer this.Unlock()
this.M = m
}
func (this *SafeExpressionMap) ReInit(m map[string][]*model.Expression) {
this.Lock()
defer this.Unlock()
this.M = m
}
func (this *SafeFilterMap) ReInit(m map[string]string) {
this.Lock()
defer this.Unlock()
this.M = m
}
####策略结构体定义
type Strategy struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
Tpl *Template `json:"tpl"` //模版
}
####表达式结构体定义
type Expression struct {
Id int `json:"id"`
Metric string `json:"metric"`
Tags map[string]string `json:"tags"`
Func string `json:"func"` // e.g. max(#3) all(#3)
Operator string `json:"operator"` // e.g. < !=
RightValue float64 `json:"rightValue"` // critical value
MaxStep int `json:"maxStep"`
Priority int `json:"priority"`
Note string `json:"note"`
ActionId int `json:"actionId"` //执行动作ID
}
cron.CleanStale()
# 定期清理任务运行入口
func CleanStale() {
for {
time.Sleep(time.Hour * 5)
cleanStale() //调用清理
}
}
##清理7天之前的历史过期数据
func cleanStale() {
before := time.Now().Unix() - 3600*24*7
arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"}
for i := 0; i < 16; i++ {
for j := 0; j < 16; j++ {
store.HistoryBigMap[arr[i]+arr[j]].CleanStale(before) //清理BigMap数据
}
}
}
#清理实现
func (this *JudgeItemMap) CleanStale(before int64) {
keys := []string{}
this.RLock()
for key, L := range this.M {
front := L.Front()
if front == nil {
continue
}
//迭代匹配时间戳,小于则过期
if front.Value.(*model.JudgeItem).Timestamp < before {
keys = append(keys, key)
}
}
this.RUnlock()
//批量清理
this.BatchDelete(keys)
}
func (this *JudgeItemMap) BatchDelete(keys []string) {
count := len(keys)
if count == 0 {
return
}
this.Lock()
defer this.Unlock()
for i := 0; i < count; i++ {
delete(this.M, keys[i]) //map delete条目
}
}
技术经验借鉴
- BigMap内存构造缓存历史数据机制与应用
- 针对报警计算funcation的设计模式之 "策略模式"应用
- SafeLinkedList并发安全的链表LIST操作实现
扩展学习
- github.com/garyburd/redigo/redis redis客户端
网友评论