美文网首页
OpenFalcon源码分析(Alarm组件)

OpenFalcon源码分析(Alarm组件)

作者: Xiao_Yang | 来源:发表于2018-09-23 09:20 被阅读0次

    Alarm版本

    VERSION = "0.2.0"

    Alarm组件功能

    judge把报警event写入redis,alarm从redis读取event,作相应处理 ( 发报警短信、邮件或callback http地址)。

    Alarm组件逻辑图

    alarm逻辑图

    Alarm配置操作

    Alt text

    main主入口分析

    func main() {
        cfg := flag.String("c", "cfg.json", "configuration file")
        version := flag.Bool("v", false, "show version")
        help := flag.Bool("h", false, "help")
        flag.Parse()
    
        if *version {
            fmt.Println(g.VERSION)
            os.Exit(0)
        }
    
        if *help {
            flag.Usage()
            os.Exit(0)
        }
    
        g.ParseConfig(*cfg)  //全局配置文件解析 【参考详细分析】
    
        g.InitLog(g.Config().LogLevel)
        if g.Config().LogLevel != "debug" {
            gin.SetMode(gin.ReleaseMode)
        }
    
        g.InitRedisConnPool() //初始化Redis连接池(同参考Judge模块分析)
        model.InitDatabase() //初始化数据库ORM
        cron.InitSenderWorker() //初始化发送Channel
    
        go http.Start()         //http API服务监听与处理 【参考详细分析】
        go cron.ReadHighEvent() //处理高优先级事件队列 【参考详细分析】
        go cron.ReadLowEvent()  //处理低优先级事件队列 【参考详细分析】
        
        go cron.CombineSms()    //合并SMS内容 【参考详细分析】
        go cron.CombineMail()   //合并MAIL内容 【参考详细分析】
        go cron.CombineIM()     //合并IM内容 【参考详细分析】
        
        go cron.ConsumeIM()     //发送事件IM   【参考详细分析】
        go cron.ConsumeSms()    //发送事件SMS  【参考详细分析】
        go cron.ConsumeMail()   //发送事件Mail 【参考详细分析】
        
        go cron.CleanExpiredEvent() //清理过期事件信息 【参考详细分析】
    
        // 注册系统信号syscall.SIGTERM,退出释放资源
        sigs := make(chan os.Signal, 1)
        signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
        go func() {
            <-sigs
            fmt.Println()
            g.RedisConnPool.Close()
            os.Exit(0)
        }()
    
        select {}   
    }
    

    g.ParseConfig(*cfg) 全局配置文件解析与其它模块相关(参考Judge模块)

    type GlobalConfig struct {
        LogLevel     string              `json:"log_level"`
        FalconPortal *FalconPortalConfig `json:"falcon_portal"`
        Http         *HttpConfig         `json:"http"`
        Redis        *RedisConfig        `json:"redis"`
        Api          *ApiConfig          `json:"api"`
        Worker       *WorkerConfig       `json:"worker"`
        Housekeeper  *HousekeeperConfig  `json:"Housekeeper"`
    }
    

    model.InitDatabase() 初始化数据库ORM

    
    func InitDatabase() {
        // set default database
        config := g.Config()
        //注册database驱动名为mysql
        orm.RegisterDataBase("default", "mysql", config.FalconPortal.Addr, config.FalconPortal.Idle, config.FalconPortal.Max)
        // register model  注册对象关系映射event.Events/event.EventCases
        orm.RegisterModel(new(event.Events), new(event.EventCases))
        if config.LogLevel == "debug" {
            orm.Debug = true
        }
    }
    
    type Events struct {
        Id          int         `json:"id" orm:"pk"`
        Step        int         `json:"step"`
        Cond        string      `json:"cond"`
        Status      int         `json:"status"`
        Timestamp   time.Time   `json:"timestamp"`
        EventCaseId *EventCases `json:"event_caseId" orm:"rel(fk)"`
    }
    
    type EventCases struct {
        // uniuq
        Id       string `json:"id" orm:"pk"`
        Endpoint string `json:"endpoint"`
        Metric   string `json:"metric"`
        Func     string `json:"func"`
        //leftValue + operator + rightValue
        Cond          string    `json:"cond"`
        Note          string    `json:"note"`
        MaxStep       int       `json:"max_step"`
        CurrentStep   int       `json:"current_step"`
        Priority      int       `json:"priority"`
        Status        string    `json:"status"`
        Timestamp     time.Time `json:"start_at"`
        UpdateAt      time.Time `json:"update_at"`
        ProcessNote   int       `json:"process_note"`
        ProcessStatus string    `json:"process_status"`
        TplCreator    string    `json:"tpl_creator"`
        ExpressionId  int       `json:"expression_id"`
        StrategyId    int       `json:"strategy_id"`
        TemplateId    int       `json:"template_id"`
        Events        []*Events `json:"evevnts" orm:"reverse(many)"`
    }
    
    

    cron.InitSenderWorker() 初始化发送Channel

    func InitSenderWorker() {
        workerConfig := g.Config().Worker
        IMWorkerChan = make(chan int, workerConfig.IM)
        SmsWorkerChan = make(chan int, workerConfig.Sms)
        MailWorkerChan = make(chan int, workerConfig.Mail)
    }
    

    http.Start() http API服务监听与处理

    func Start() {
        if !g.Config().Http.Enabled {
            return
        }
        addr := g.Config().Http.Listen
        if addr == "" {
            return
        }
    
        r := gin.Default()
        r.GET("/version", Version)    //版本信息
        r.GET("/health", Health)      //服务健康状态
        r.GET("/workdir", Workdir)    //工作目录
        r.Run(addr)
    
        log.Println("http listening", addr)
    }
    
    func Version(c *gin.Context) {
        c.String(200, g.VERSION)
    }
    
    func Health(c *gin.Context) {
        c.String(200, "ok")
    }
    
    func Workdir(c *gin.Context) {
        c.String(200, file.SelfDir())
    }
    
    

    cron.ReadHighEvent() 处理高优先级事件队列

    
    #highQueues中配置的几个event队列中的事件是不会做报警合并的,因为那些是高优先级的报警,报警合并只是针对lowQueues中的事件。如果所有的事件都不想做报警合并,就把所有的event队列都配置到highQueues中即可
    
    #我们在配置报警策略的时候配置了报警级别,比如P0/P1/P2等等,每个及别的报警都会对应不同的redis队列 alarm去读取这个数据的时候我们希望先读取P0的数据,再读取P1的数据,最后读取P5的数据,因为我们希望先处理优先级高的。于是:用了redis的brpop指令
    
    # 从Redis读取高优先级队列事件信息
    # 解析事件信息(action/callback/teams/user:phone、im、mail等)
    # 根据事件信息生成IM、SMS、MAIL内容存入Redis队列
    func ReadHighEvent() {
        queues := g.Config().Redis.HighQueues //默认event:p0、p1、p2
        if len(queues) == 0 {
            return
        }
    
        for {
            event, err := popEvent(queues)  //事件出列
            if err != nil {
                time.Sleep(time.Second)
                continue
            }
            consume(event, true)   //处理事件
        }
    }
    
    ## 事件出列并将处理事件入库保存
    func popEvent(queues []string) (*cmodel.Event, error) {
    
        count := len(queues)
    
        params := make([]interface{}, count+1)
        for i := 0; i < count; i++ {
            params[i] = queues[i]
        }
        // set timeout 0
        params[count] = 0
    
        rc := g.RedisConnPool.Get()    //redis客户端连接池 
        defer rc.Close()
    
        reply, err := redis.Strings(rc.Do("BRPOP", params...)) //阻塞式POP多队列(keys)事件
        if err != nil {
            log.Errorf("get alarm event from redis fail: %v", err)
            return nil, err
        }
    
        var event cmodel.Event
        err = json.Unmarshal([]byte(reply[1]), &event) //反序列JSON为event结构
        if err != nil {
            log.Errorf("parse alarm event fail: %v", err)
            return nil, err
        }
    
        log.Debugf("pop event: %s", event.String())
    
        //插入到Mysql database,解析完的事件不再保留在内存中
        eventmodel.InsertEvent(&event)   
    
        return &event, nil
    }
    
    ## 处理事件 
    ## 
    func consume(event *cmodel.Event, isHigh bool) {
        actionId := event.ActionId()  //事件触发执行动作ID
        if actionId <= 0 {
            return
        }
    
        action := api.GetAction(actionId)  //执行动作ID查询API组件获取Action
        if action == nil {
            return
        }
    
        if action.Callback == 1 {   //是否设置Callback
            HandleCallback(event, action)  //Callback处理【查看详细分析】
        }
    
        if isHigh {
            consumeHighEvents(event, action) //高优化级事件处置【查看详细分析】
        } else {
            consumeLowEvents(event, action) //低优化级事件处置
        }
    }
    
    ### Callback配置处理
    func HandleCallback(event *model.Event, action *api.Action) {
    
        teams := action.Uic
        phones := []string{}
        mails := []string{}
        ims := []string{}
    
        if teams != "" {
            phones, mails, ims = api.ParseTeams(teams)  
            smsContent := GenerateSmsContent(event)
            mailContent := GenerateMailContent(event)
            imContent := GenerateIMContent(event)
            if action.BeforeCallbackSms == 1 {  //Call之前发送SMS
                redi.WriteSms(phones, smsContent)
                redi.WriteIM(ims, imContent)
            }
    
            if action.BeforeCallbackMail == 1 {//Call之前发送MAIL
                redi.WriteMail(mails, smsContent, mailContent)
            }
        }
    
        message := Callback(event, action)  //CallBack URL配置执行
    
        if teams != "" {
            if action.AfterCallbackSms == 1 { //Call之后发送SMS
                redi.WriteSms(phones, message)
                redi.WriteIM(ims, message)
            }
    
            if action.AfterCallbackMail == 1 { //Call之前发送MAIL
                redi.WriteMail(mails, message, message)
            }
        }
    
    }
    
    #### 事件触发回调HTTPAPI URL处理
    func Callback(event *model.Event, action *api.Action) string {
        if action.Url == "" {
            return "callback url is blank"
        }
    
        L := make([]string, 0)
        if len(event.PushedTags) > 0 {
            for k, v := range event.PushedTags {
                L = append(L, fmt.Sprintf("%s:%s", k, v))
            }
        }
    
        tags := ""
        if len(L) > 0 {
            tags = strings.Join(L, ",")
        }
    
        //HTTP请求与请求参数构造
        req := httplib.Get(action.Url).SetTimeout(3*time.Second, 20*time.Second)  
    
        req.Param("endpoint", event.Endpoint)
        req.Param("metric", event.Metric())
        req.Param("status", event.Status)
        req.Param("step", fmt.Sprintf("%d", event.CurrentStep))
        req.Param("priority", fmt.Sprintf("%d", event.Priority()))
        req.Param("time", event.FormattedTime())
        req.Param("tpl_id", fmt.Sprintf("%d", event.TplId()))
        req.Param("exp_id", fmt.Sprintf("%d", event.ExpressionId()))
        req.Param("stra_id", fmt.Sprintf("%d", event.StrategyId()))
        req.Param("left_value", utils.ReadableFloat(event.LeftValue))
        req.Param("tags", tags)
    
        resp, e := req.String()  //HTTP请求执行,返回结果格式为字符串
    
        success := "success"
        if e != nil {
            log.Errorf("callback fail, action:%v, event:%s, error:%s", action, event.String(), e.Error())
            success = fmt.Sprintf("fail:%s", e.Error())
        }
        message := fmt.Sprintf("curl %s %s. resp: %s", action.Url, success, resp)
        log.Debugf("callback to url:%s, event:%s, resp:%s", action.Url, event.String(), resp)
    
        return message  //返回执行结果
    } 
    
    
    ### 消费高优先级队列事件
    ### 高优先级的不做报警合并
    ### 低先级处理逻辑相同
    func consumeHighEvents(event *cmodel.Event, action *api.Action) {
        if action.Uic == "" {
            return
        }
    
        phones, mails, ims := api.ParseTeams(action.Uic) //API组件查询解析告警组成员的通知联系信息 【查看详细分析】
        smsContent := GenerateSmsContent(event)  //生成事件SMS内容格式字符串
        mailContent := GenerateMailContent(event) //生成事件Mail内容格式字符串
        imContent := GenerateIMContent(event) //生成事件IM内容格式字符串
    
        // <=P2 才发送短信
        if event.Priority() < 3 {
            redi.WriteSms(phones, smsContent)   //入SMS队列 
        }
    
        redi.WriteIM(ims, imContent) //入IM队列 【查看详细分析】
        redi.WriteMail(mails, smsContent, mailContent) //入MAIL队列
    
    }
    
    
    #### 通过API查询解析维护人员组成员phones, emails, IM
    func ParseTeams(teams string) ([]string, []string, []string) {
        if teams == "" {
            return []string{}, []string{}, []string{}
        }
    
        userMap := GetUsers(teams)  //API查询获取用户信息map
        phoneSet := set.NewStringSet()
        mailSet := set.NewStringSet()
        imSet := set.NewStringSet()
        for _, user := range userMap {
            if user.Phone != "" {
                phoneSet.Add(user.Phone)
            }
            if user.Email != "" {
                mailSet.Add(user.Email)
            }
            if user.IM != "" {
                imSet.Add(user.IM)
            }
        }
        return phoneSet.ToSlice(), mailSet.ToSlice(), imSet.ToSlice()
    }
    ###### 通过成员组信息查询用户
    func GetUsers(teams string) map[string]*uic.User {
        userMap := make(map[string]*uic.User)
        arr := strings.Split(teams, ",")
        for _, team := range arr {
            if team == "" {
                continue
            }
    
            users := UsersOf(team)  //API调用,成员组查询成员
            if users == nil {
                continue
            }
    
            for _, user := range users {
                userMap[user.Name] = user
            }
        }
        return userMap
    }
    func UsersOf(team string) []*uic.User {
        users := CurlUic(team) //CURL查询API
    
        if users != nil {
            Users.Set(team, users)
        } else {
            users = Users.Get(team)
        }
    
        return users
    }
    ###### API组件接口,CURL HTTP访问与响应数据处理
    func CurlUic(team string) []*uic.User {
        if team == "" {
            return []*uic.User{}
        }
    
        uri := fmt.Sprintf("%s/api/v1/team/name/%s", g.Config().Api.PlusApi, team)     //API接口
        req := httplib.Get(uri).SetTimeout(2*time.Second, 10*time.Second)  //GET请求
        token, _ := json.Marshal(map[string]string{
            "name": "falcon-alarm",
            "sig":  g.Config().Api.PlusApiToken,
        })
        req.Header("Apitoken", string(token))
    
        var team_users APIGetTeamOutput
        err := req.ToJson(&team_users)     //执行与返回JSON
        if err != nil {
            log.Errorf("curl %s fail: %v", uri, err)
            return nil
        }
    
        return team_users.Users  //返回用户列表
    }
    
    
    ####依据事件信息生成SMS内容
    #### IM/MAIL处理逻辑相同
    func GenerateSmsContent(event *model.Event) string {
        return BuildCommonSMSContent(event)
    }
    func BuildCommonSMSContent(event *model.Event) string {
        return fmt.Sprintf(
            "[P%d][%s][%s][][%s %s %s %s %s%s%s][O%d %s]",
            event.Priority(),
            event.Status,
            event.Endpoint,
            event.Note(),
            event.Func(),
            event.Metric(),
            utils.SortedTags(event.PushedTags),
            utils.ReadableFloat(event.LeftValue),
            event.Operator(),
            utils.ReadableFloat(event.RightValue()),
            event.CurrentStep,
            event.FormattedTime(),
        )
    }
    ##### WriteIM WriteSms
    func WriteIM(tos []string, content string) {
        if len(tos) == 0 {
            return
        }
        im := &model.IM{Tos: strings.Join(tos, ","), Content: content}
        WriteIMModel(im)   //写入IM队列
    }
    ######
    func WriteIMModel(im *model.IM) {
        if im == nil {
            return
        }
    
        bs, err := json.Marshal(im)  //json格式化
        if err != nil {
            log.Error(err)
            return
        }
    
        log.Debugf("write im to queue, im:%v, queue:%s", im, IM_QUEUE_NAME)
        lpush(IM_QUEUE_NAME, string(bs))  //PUSH入IM redis队列
        
        //lpush(MAIL_QUEUE_NAME, string(bs))   PUSH入MAIL redis队列
        //lpush(SMS_QUEUE_NAME, string(bs))    PUSH入SMS redis队列
    } 
    
    

    cron.ReadLowEvent() 同ReadhighEvent处理低优先级队列(Redis keys)

    func ReadLowEvent() {
        queues := g.Config().Redis.LowQueues //默认event:p3、p4、p5、p6
        if len(queues) == 0 {
            return
        }
    
        for {
            event, err := popEvent(queues)  //同highEvent
            if err != nil {
                time.Sleep(time.Second)
                continue
            }
            consume(event, false)   //同highEvent
        }
    }
    
    

    cron.CombineSms() 合并sms内容

    func CombineSms() {
        for {
            // 每分钟读取处理一次
            time.Sleep(time.Minute)
            combineSms()
        }
    }
    
    ## 合并SMS信息
    func combineSms() {
        dtos := popAllSmsDto()   //SMS队列"/queue/user/sms" RPOP所有事件短信内容信息
        count := len(dtos)
        if count == 0 {
            return
        }
    
        //整理事件信息至内存map
        //key:Priority Status Phone Metric
        //value: pop原事件信息
        dtoMap := make(map[string][]*SmsDto) 
        for i := 0; i < count; i++ {
            key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Phone, dtos[i].Metric)
            if _, ok := dtoMap[key]; ok {
                dtoMap[key] = append(dtoMap[key], dtos[i])
            } else {
                dtoMap[key] = []*SmsDto{dtos[i]}
            }
        }
    
        //如果同一个KEY,有多条SMS内容则合并成一条提供link链接
        for _, arr := range dtoMap {
            size := len(arr)  
            if size == 1 {  
                redi.WriteSms([]string{arr[0].Phone}, arr[0].Content)
                continue
            }
    
            // 如果有多条,把多个sms内容写入数据库,只给用户提供一个链接
            contentArr := make([]string, size)
            for i := 0; i < size; i++ {
                contentArr[i] = arr[i].Content
            }
            content := strings.Join(contentArr, ",,")
    
            first := arr[0].Content
            t := strings.Split(first, "][")
            eg := ""
            if len(t) >= 3 {
                eg = t[2]
            }
    
            path, err := api.LinkToSMS(content) //API组件调用links
            sms := ""
            if err != nil || path == "" {
                sms = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
                log.Error("get short link fail", err)
            } else {
                sms = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
                    arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)  //多条合并带连接地址
                log.Debugf("combined sms is:%s", sms)
            }
    
            redi.WriteSms([]string{arr[0].Phone}, sms)  //SMS重入队"/sms"  }
    }
    
    ### 所有SMS内容出队
    func popAllSmsDto() []*SmsDto {
        ret := []*SmsDto{}
        queue := g.Config().Redis.UserSmsQueue  //队列"/queue/user/sms"
    
        rc := g.RedisConnPool.Get() //redis client连接
        defer rc.Close()
    
        for {
            reply, err := redis.String(rc.Do("RPOP", queue)) //RPOP
            if err != nil {
                if err != redis.ErrNil {
                    log.Error("get SmsDto fail", err)
                }
                break
            }
            if reply == "" || reply == "nil" {
                continue
            }
    
            var smsDto SmsDto
            err = json.Unmarshal([]byte(reply), &smsDto) //JSON反序列化为Sms结构体
            if err != nil {
                log.Errorf("json unmarshal SmsDto: %s fail: %v", reply, err)
                continue
            }
            ret = append(ret, &smsDto) //slice保存
        }
        return ret   //返回slice
    }
    
    
    

    cron.CombineMail() 合并MAIL内容(同上SMS)

    func CombineMail() {
        for {
            // 每分钟读取处理一次
            time.Sleep(time.Minute)
            combineMail()
        }
    }
    
    func combineMail() {
        dtos := popAllMailDto() //Mail队列"/queue/user/mail" RPOP所有事件邮件内容信息
        count := len(dtos)
        if count == 0 {
            return
        }
    
        dtoMap := make(map[string][]*MailDto)
        for i := 0; i < count; i++ {
            key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Email, dtos[i].Metric)
            if _, ok := dtoMap[key]; ok {
                dtoMap[key] = append(dtoMap[key], dtos[i])
            } else {
                dtoMap[key] = []*MailDto{dtos[i]}
            }
        }
    
        // 不要在这处理,继续写回redis,否则重启alarm很容易丢数据
        for _, arr := range dtoMap {
            size := len(arr)
            if size == 1 {
                redi.WriteMail([]string{arr[0].Email}, arr[0].Subject, arr[0].Content)
                continue
            }
    
            subject := fmt.Sprintf("[P%d][%s] %d %s", arr[0].Priority, arr[0].Status, size, arr[0].Metric)
            contentArr := make([]string, size)
            for i := 0; i < size; i++ {
                contentArr[i] = arr[i].Content
            }
            content := strings.Join(contentArr, "\r\n")
    
            log.Debugf("combined mail subject:%s, content:%s", subject, content)
            redi.WriteMail([]string{arr[0].Email}, subject, content)//mail重入队"/mail"
        }
    }
    
    

    cron.CombineIM() 合并IM内容(同上SMS)

    func CombineIM() {
        for {
            // 每分钟读取处理一次
            time.Sleep(time.Minute)
            combineIM()
        }
    }
    
    func combineIM() {
        dtos := popAllImDto() //IM队列"/queue/user/im" RPOP所有事件IM内容信息
        count := len(dtos)
        if count == 0 {
            return
        }
    
        dtoMap := make(map[string][]*ImDto)
        for i := 0; i < count; i++ {
            key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
            if _, ok := dtoMap[key]; ok {
                dtoMap[key] = append(dtoMap[key], dtos[i])
            } else {
                dtoMap[key] = []*ImDto{dtos[i]}
            }
        }
    
        for _, arr := range dtoMap {
            size := len(arr)
            if size == 1 {
                redi.WriteIM([]string{arr[0].IM}, arr[0].Content)
                continue
            }
    
            // 把多个im内容写入数据库,只给用户提供一个链接
            contentArr := make([]string, size)
            for i := 0; i < size; i++ {
                contentArr[i] = arr[i].Content
            }
            content := strings.Join(contentArr, ",,")
    
            first := arr[0].Content
            t := strings.Split(first, "][")
            eg := ""
            if len(t) >= 3 {
                eg = t[2]
            }
    
            path, err := api.LinkToSMS(content)
            chat := ""
            if err != nil || path == "" {
                chat = fmt.Sprintf("[P%d][%s] %d %s.  e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
                log.Error("create short link fail", err)
            } else {
                chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
                    arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
                log.Debugf("combined im is:%s", chat)
            }
            
            redi.WriteIM([]string{arr[0].IM}, chat) //im重入队"/im"
        }
    }
    
    

    cron.ConsumeSms() 发送SMS事件通知

    # 发送SMS事件通知入口函数
    func ConsumeSms() {
        for {
            L := redi.PopAllSms()  
            if len(L) == 0 {
                time.Sleep(time.Millisecond * 200)
                continue
            }
            SendSmsList(L)
        }
    }
    ## Pop所有'/sms'队列信息
    func PopAllSms() []*model.Sms {
        ret := []*model.Sms{}
        queue := SMS_QUEUE_NAME  //"/sms"队列
        rc := g.RedisConnPool.Get()  //redis client连接池
        defer rc.Close()
    
        for {
            reply, err := redis.String(rc.Do("RPOP", queue)) //pop 
            if err != nil {
                if err != redis.ErrNil {
                    log.Error(err)
                }
                break
            }
    
            if reply == "" || reply == "nil" {
                continue
            }
    
            var sms model.Sms
            err = json.Unmarshal([]byte(reply), &sms)  //json反序列为结构
            if err != nil {
                log.Error(err, reply)
                continue
            }
            ret = append(ret, &sms)   //slice sms
        }
        return ret   //返回slice
    }
    
    ## 处理SMS发送
    func SendSmsList(L []*model.Sms) {
        for _, sms := range L {
            SmsWorkerChan <- 1  //sms工作channel
            go SendSms(sms)     //并发线程处理发送SMS
        }
    }
    
    
    ###发送SMS实现函数
    func SendSms(sms *model.Sms) {
        defer func() {
            <-SmsWorkerChan
        }()
    
        url := g.Config().Api.Sms   
        r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)    //HTTP POST请求
        r.Param("tos", sms.Tos)   //HTTP 请求参数tos
        r.Param("content", sms.Content)  //HTTP 请求参数content 
        resp, err := r.String()  
        if err != nil {
            log.Errorf("send sms fail, tos:%s, cotent:%s, error:%v", sms.Tos, sms.Content, err)
        }
    
        log.Debugf("send sms:%v, resp:%v, url:%s", sms, resp, url)
    }
    
    

    cron.ConsumeIM() 发送IM事件通知(同上SMS)

    func ConsumeIM() {
        for {
            L := redi.PopAllIM()  //pop all
            if len(L) == 0 {
                time.Sleep(time.Millisecond * 200)
                continue
            }
            SendIMList(L)       //send
        }
    }
    
    func SendIMList(L []*model.IM) {
        for _, im := range L {
            IMWorkerChan <- 1
            go SendIM(im)
        }
    }
    
    func SendIM(im *model.IM) {
        defer func() {
            <-IMWorkerChan
        }()
    
        url := g.Config().Api.IM
        r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
        r.Param("tos", im.Tos)
        r.Param("content", im.Content)
        resp, err := r.String()
        if err != nil {
            log.Errorf("send im fail, tos:%s, cotent:%s, error:%v", im.Tos, im.Content, err)
        }
    
        log.Debugf("send im:%v, resp:%v, url:%s", im, resp, url)
    }
    
    

    cron.ConsumeMail() 发送MAIL事件通知(同上SMS)

    func ConsumeMail() {
        for {
            L := redi.PopAllMail()  //pop all
            if len(L) == 0 {
                time.Sleep(time.Millisecond * 200)
                continue
            }
            SendMailList(L)  //send
        }
    }
    
    func SendMailList(L []*model.Mail) {
        for _, mail := range L {
            MailWorkerChan <- 1
            go SendMail(mail)
        }
    }
    
    func SendMail(mail *model.Mail) {
        defer func() {
            <-MailWorkerChan
        }()
    
        url := g.Config().Api.Mail
        r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
        r.Param("tos", mail.Tos)
        r.Param("subject", mail.Subject)
        r.Param("content", mail.Content)
        resp, err := r.String()
        if err != nil {
            log.Errorf("send mail fail, receiver:%s, subject:%s, cotent:%s, error:%v", mail.Tos, mail.Subject, mail.Content, err)
        }
    
        log.Debugf("send mail:%v, resp:%v, url:%s", mail, resp, url)
    }
    
    

    cron.CleanExpiredEvent() //清理过旧的事件信息

    func CleanExpiredEvent() {
        for {
    
            retention_days := g.Config().Housekeeper.EventRetentionDays  //内存保留event,默认7day
            delete_batch := g.Config().Housekeeper.EventDeleteBatch //批量清理events条数,默认100
    
            now := time.Now()
            before := now.Add(time.Duration(-retention_days*24) * time.Hour)
            eventmodel.DeleteEventOlder(before, delete_batch) //清理处理
            time.Sleep(time.Second * 60)
        }
    }
    //清理数据库事件信息
    func DeleteEventOlder(before time.Time, limit int) {
        t := before.Format(timeLayout)
        sqlTpl := `delete from events where timestamp<? limit ?`
        q := orm.NewOrm()
        resp, err := q.Raw(sqlTpl, t, limit).Exec()
        if err != nil {
            log.Errorf("delete event older than %v fail, error:%v", t, err)
        } else {
            affected, _ := resp.RowsAffected()
            log.Debugf("delete event older than %v, rows affected:%v", t, affected)
        }
    }
    
    

    技术经验借鉴

    • 报警合并机制实现
    • 事件异步化和列队实现

    扩展学习

    相关文章

      网友评论

          本文标题:OpenFalcon源码分析(Alarm组件)

          本文链接:https://www.haomeiwen.com/subject/ldrgnftx.html