Alarm版本
VERSION = "0.2.0"
Alarm组件功能
judge把报警event写入redis,alarm从redis读取event,作相应处理 ( 发报警短信、邮件或callback http地址)。
alarm逻辑图Alarm组件逻辑图
Alt textAlarm配置操作
main主入口分析
func main() {
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
help := flag.Bool("h", false, "help")
flag.Parse()
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
if *help {
flag.Usage()
os.Exit(0)
}
g.ParseConfig(*cfg) //全局配置文件解析 【参考详细分析】
g.InitLog(g.Config().LogLevel)
if g.Config().LogLevel != "debug" {
gin.SetMode(gin.ReleaseMode)
}
g.InitRedisConnPool() //初始化Redis连接池(同参考Judge模块分析)
model.InitDatabase() //初始化数据库ORM
cron.InitSenderWorker() //初始化发送Channel
go http.Start() //http API服务监听与处理 【参考详细分析】
go cron.ReadHighEvent() //处理高优先级事件队列 【参考详细分析】
go cron.ReadLowEvent() //处理低优先级事件队列 【参考详细分析】
go cron.CombineSms() //合并SMS内容 【参考详细分析】
go cron.CombineMail() //合并MAIL内容 【参考详细分析】
go cron.CombineIM() //合并IM内容 【参考详细分析】
go cron.ConsumeIM() //发送事件IM 【参考详细分析】
go cron.ConsumeSms() //发送事件SMS 【参考详细分析】
go cron.ConsumeMail() //发送事件Mail 【参考详细分析】
go cron.CleanExpiredEvent() //清理过期事件信息 【参考详细分析】
// 注册系统信号syscall.SIGTERM,退出释放资源
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
fmt.Println()
g.RedisConnPool.Close()
os.Exit(0)
}()
select {}
}
g.ParseConfig(*cfg) 全局配置文件解析与其它模块相关(参考Judge模块)
type GlobalConfig struct {
LogLevel string `json:"log_level"`
FalconPortal *FalconPortalConfig `json:"falcon_portal"`
Http *HttpConfig `json:"http"`
Redis *RedisConfig `json:"redis"`
Api *ApiConfig `json:"api"`
Worker *WorkerConfig `json:"worker"`
Housekeeper *HousekeeperConfig `json:"Housekeeper"`
}
model.InitDatabase() 初始化数据库ORM
func InitDatabase() {
// set default database
config := g.Config()
//注册database驱动名为mysql
orm.RegisterDataBase("default", "mysql", config.FalconPortal.Addr, config.FalconPortal.Idle, config.FalconPortal.Max)
// register model 注册对象关系映射event.Events/event.EventCases
orm.RegisterModel(new(event.Events), new(event.EventCases))
if config.LogLevel == "debug" {
orm.Debug = true
}
}
type Events struct {
Id int `json:"id" orm:"pk"`
Step int `json:"step"`
Cond string `json:"cond"`
Status int `json:"status"`
Timestamp time.Time `json:"timestamp"`
EventCaseId *EventCases `json:"event_caseId" orm:"rel(fk)"`
}
type EventCases struct {
// uniuq
Id string `json:"id" orm:"pk"`
Endpoint string `json:"endpoint"`
Metric string `json:"metric"`
Func string `json:"func"`
//leftValue + operator + rightValue
Cond string `json:"cond"`
Note string `json:"note"`
MaxStep int `json:"max_step"`
CurrentStep int `json:"current_step"`
Priority int `json:"priority"`
Status string `json:"status"`
Timestamp time.Time `json:"start_at"`
UpdateAt time.Time `json:"update_at"`
ProcessNote int `json:"process_note"`
ProcessStatus string `json:"process_status"`
TplCreator string `json:"tpl_creator"`
ExpressionId int `json:"expression_id"`
StrategyId int `json:"strategy_id"`
TemplateId int `json:"template_id"`
Events []*Events `json:"evevnts" orm:"reverse(many)"`
}
cron.InitSenderWorker() 初始化发送Channel
func InitSenderWorker() {
workerConfig := g.Config().Worker
IMWorkerChan = make(chan int, workerConfig.IM)
SmsWorkerChan = make(chan int, workerConfig.Sms)
MailWorkerChan = make(chan int, workerConfig.Mail)
}
http.Start() http API服务监听与处理
func Start() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen
if addr == "" {
return
}
r := gin.Default()
r.GET("/version", Version) //版本信息
r.GET("/health", Health) //服务健康状态
r.GET("/workdir", Workdir) //工作目录
r.Run(addr)
log.Println("http listening", addr)
}
func Version(c *gin.Context) {
c.String(200, g.VERSION)
}
func Health(c *gin.Context) {
c.String(200, "ok")
}
func Workdir(c *gin.Context) {
c.String(200, file.SelfDir())
}
cron.ReadHighEvent() 处理高优先级事件队列
#highQueues中配置的几个event队列中的事件是不会做报警合并的,因为那些是高优先级的报警,报警合并只是针对lowQueues中的事件。如果所有的事件都不想做报警合并,就把所有的event队列都配置到highQueues中即可
#我们在配置报警策略的时候配置了报警级别,比如P0/P1/P2等等,每个及别的报警都会对应不同的redis队列 alarm去读取这个数据的时候我们希望先读取P0的数据,再读取P1的数据,最后读取P5的数据,因为我们希望先处理优先级高的。于是:用了redis的brpop指令
# 从Redis读取高优先级队列事件信息
# 解析事件信息(action/callback/teams/user:phone、im、mail等)
# 根据事件信息生成IM、SMS、MAIL内容存入Redis队列
func ReadHighEvent() {
queues := g.Config().Redis.HighQueues //默认event:p0、p1、p2
if len(queues) == 0 {
return
}
for {
event, err := popEvent(queues) //事件出列
if err != nil {
time.Sleep(time.Second)
continue
}
consume(event, true) //处理事件
}
}
## 事件出列并将处理事件入库保存
func popEvent(queues []string) (*cmodel.Event, error) {
count := len(queues)
params := make([]interface{}, count+1)
for i := 0; i < count; i++ {
params[i] = queues[i]
}
// set timeout 0
params[count] = 0
rc := g.RedisConnPool.Get() //redis客户端连接池
defer rc.Close()
reply, err := redis.Strings(rc.Do("BRPOP", params...)) //阻塞式POP多队列(keys)事件
if err != nil {
log.Errorf("get alarm event from redis fail: %v", err)
return nil, err
}
var event cmodel.Event
err = json.Unmarshal([]byte(reply[1]), &event) //反序列JSON为event结构
if err != nil {
log.Errorf("parse alarm event fail: %v", err)
return nil, err
}
log.Debugf("pop event: %s", event.String())
//插入到Mysql database,解析完的事件不再保留在内存中
eventmodel.InsertEvent(&event)
return &event, nil
}
## 处理事件
##
func consume(event *cmodel.Event, isHigh bool) {
actionId := event.ActionId() //事件触发执行动作ID
if actionId <= 0 {
return
}
action := api.GetAction(actionId) //执行动作ID查询API组件获取Action
if action == nil {
return
}
if action.Callback == 1 { //是否设置Callback
HandleCallback(event, action) //Callback处理【查看详细分析】
}
if isHigh {
consumeHighEvents(event, action) //高优化级事件处置【查看详细分析】
} else {
consumeLowEvents(event, action) //低优化级事件处置
}
}
### Callback配置处理
func HandleCallback(event *model.Event, action *api.Action) {
teams := action.Uic
phones := []string{}
mails := []string{}
ims := []string{}
if teams != "" {
phones, mails, ims = api.ParseTeams(teams)
smsContent := GenerateSmsContent(event)
mailContent := GenerateMailContent(event)
imContent := GenerateIMContent(event)
if action.BeforeCallbackSms == 1 { //Call之前发送SMS
redi.WriteSms(phones, smsContent)
redi.WriteIM(ims, imContent)
}
if action.BeforeCallbackMail == 1 {//Call之前发送MAIL
redi.WriteMail(mails, smsContent, mailContent)
}
}
message := Callback(event, action) //CallBack URL配置执行
if teams != "" {
if action.AfterCallbackSms == 1 { //Call之后发送SMS
redi.WriteSms(phones, message)
redi.WriteIM(ims, message)
}
if action.AfterCallbackMail == 1 { //Call之前发送MAIL
redi.WriteMail(mails, message, message)
}
}
}
#### 事件触发回调HTTPAPI URL处理
func Callback(event *model.Event, action *api.Action) string {
if action.Url == "" {
return "callback url is blank"
}
L := make([]string, 0)
if len(event.PushedTags) > 0 {
for k, v := range event.PushedTags {
L = append(L, fmt.Sprintf("%s:%s", k, v))
}
}
tags := ""
if len(L) > 0 {
tags = strings.Join(L, ",")
}
//HTTP请求与请求参数构造
req := httplib.Get(action.Url).SetTimeout(3*time.Second, 20*time.Second)
req.Param("endpoint", event.Endpoint)
req.Param("metric", event.Metric())
req.Param("status", event.Status)
req.Param("step", fmt.Sprintf("%d", event.CurrentStep))
req.Param("priority", fmt.Sprintf("%d", event.Priority()))
req.Param("time", event.FormattedTime())
req.Param("tpl_id", fmt.Sprintf("%d", event.TplId()))
req.Param("exp_id", fmt.Sprintf("%d", event.ExpressionId()))
req.Param("stra_id", fmt.Sprintf("%d", event.StrategyId()))
req.Param("left_value", utils.ReadableFloat(event.LeftValue))
req.Param("tags", tags)
resp, e := req.String() //HTTP请求执行,返回结果格式为字符串
success := "success"
if e != nil {
log.Errorf("callback fail, action:%v, event:%s, error:%s", action, event.String(), e.Error())
success = fmt.Sprintf("fail:%s", e.Error())
}
message := fmt.Sprintf("curl %s %s. resp: %s", action.Url, success, resp)
log.Debugf("callback to url:%s, event:%s, resp:%s", action.Url, event.String(), resp)
return message //返回执行结果
}
### 消费高优先级队列事件
### 高优先级的不做报警合并
### 低先级处理逻辑相同
func consumeHighEvents(event *cmodel.Event, action *api.Action) {
if action.Uic == "" {
return
}
phones, mails, ims := api.ParseTeams(action.Uic) //API组件查询解析告警组成员的通知联系信息 【查看详细分析】
smsContent := GenerateSmsContent(event) //生成事件SMS内容格式字符串
mailContent := GenerateMailContent(event) //生成事件Mail内容格式字符串
imContent := GenerateIMContent(event) //生成事件IM内容格式字符串
// <=P2 才发送短信
if event.Priority() < 3 {
redi.WriteSms(phones, smsContent) //入SMS队列
}
redi.WriteIM(ims, imContent) //入IM队列 【查看详细分析】
redi.WriteMail(mails, smsContent, mailContent) //入MAIL队列
}
#### 通过API查询解析维护人员组成员phones, emails, IM
func ParseTeams(teams string) ([]string, []string, []string) {
if teams == "" {
return []string{}, []string{}, []string{}
}
userMap := GetUsers(teams) //API查询获取用户信息map
phoneSet := set.NewStringSet()
mailSet := set.NewStringSet()
imSet := set.NewStringSet()
for _, user := range userMap {
if user.Phone != "" {
phoneSet.Add(user.Phone)
}
if user.Email != "" {
mailSet.Add(user.Email)
}
if user.IM != "" {
imSet.Add(user.IM)
}
}
return phoneSet.ToSlice(), mailSet.ToSlice(), imSet.ToSlice()
}
###### 通过成员组信息查询用户
func GetUsers(teams string) map[string]*uic.User {
userMap := make(map[string]*uic.User)
arr := strings.Split(teams, ",")
for _, team := range arr {
if team == "" {
continue
}
users := UsersOf(team) //API调用,成员组查询成员
if users == nil {
continue
}
for _, user := range users {
userMap[user.Name] = user
}
}
return userMap
}
func UsersOf(team string) []*uic.User {
users := CurlUic(team) //CURL查询API
if users != nil {
Users.Set(team, users)
} else {
users = Users.Get(team)
}
return users
}
###### API组件接口,CURL HTTP访问与响应数据处理
func CurlUic(team string) []*uic.User {
if team == "" {
return []*uic.User{}
}
uri := fmt.Sprintf("%s/api/v1/team/name/%s", g.Config().Api.PlusApi, team) //API接口
req := httplib.Get(uri).SetTimeout(2*time.Second, 10*time.Second) //GET请求
token, _ := json.Marshal(map[string]string{
"name": "falcon-alarm",
"sig": g.Config().Api.PlusApiToken,
})
req.Header("Apitoken", string(token))
var team_users APIGetTeamOutput
err := req.ToJson(&team_users) //执行与返回JSON
if err != nil {
log.Errorf("curl %s fail: %v", uri, err)
return nil
}
return team_users.Users //返回用户列表
}
####依据事件信息生成SMS内容
#### IM/MAIL处理逻辑相同
func GenerateSmsContent(event *model.Event) string {
return BuildCommonSMSContent(event)
}
func BuildCommonSMSContent(event *model.Event) string {
return fmt.Sprintf(
"[P%d][%s][%s][][%s %s %s %s %s%s%s][O%d %s]",
event.Priority(),
event.Status,
event.Endpoint,
event.Note(),
event.Func(),
event.Metric(),
utils.SortedTags(event.PushedTags),
utils.ReadableFloat(event.LeftValue),
event.Operator(),
utils.ReadableFloat(event.RightValue()),
event.CurrentStep,
event.FormattedTime(),
)
}
##### WriteIM WriteSms
func WriteIM(tos []string, content string) {
if len(tos) == 0 {
return
}
im := &model.IM{Tos: strings.Join(tos, ","), Content: content}
WriteIMModel(im) //写入IM队列
}
######
func WriteIMModel(im *model.IM) {
if im == nil {
return
}
bs, err := json.Marshal(im) //json格式化
if err != nil {
log.Error(err)
return
}
log.Debugf("write im to queue, im:%v, queue:%s", im, IM_QUEUE_NAME)
lpush(IM_QUEUE_NAME, string(bs)) //PUSH入IM redis队列
//lpush(MAIL_QUEUE_NAME, string(bs)) PUSH入MAIL redis队列
//lpush(SMS_QUEUE_NAME, string(bs)) PUSH入SMS redis队列
}
cron.ReadLowEvent() 同ReadhighEvent处理低优先级队列(Redis keys)
func ReadLowEvent() {
queues := g.Config().Redis.LowQueues //默认event:p3、p4、p5、p6
if len(queues) == 0 {
return
}
for {
event, err := popEvent(queues) //同highEvent
if err != nil {
time.Sleep(time.Second)
continue
}
consume(event, false) //同highEvent
}
}
cron.CombineSms() 合并sms内容
func CombineSms() {
for {
// 每分钟读取处理一次
time.Sleep(time.Minute)
combineSms()
}
}
## 合并SMS信息
func combineSms() {
dtos := popAllSmsDto() //SMS队列"/queue/user/sms" RPOP所有事件短信内容信息
count := len(dtos)
if count == 0 {
return
}
//整理事件信息至内存map
//key:Priority Status Phone Metric
//value: pop原事件信息
dtoMap := make(map[string][]*SmsDto)
for i := 0; i < count; i++ {
key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Phone, dtos[i].Metric)
if _, ok := dtoMap[key]; ok {
dtoMap[key] = append(dtoMap[key], dtos[i])
} else {
dtoMap[key] = []*SmsDto{dtos[i]}
}
}
//如果同一个KEY,有多条SMS内容则合并成一条提供link链接
for _, arr := range dtoMap {
size := len(arr)
if size == 1 {
redi.WriteSms([]string{arr[0].Phone}, arr[0].Content)
continue
}
// 如果有多条,把多个sms内容写入数据库,只给用户提供一个链接
contentArr := make([]string, size)
for i := 0; i < size; i++ {
contentArr[i] = arr[i].Content
}
content := strings.Join(contentArr, ",,")
first := arr[0].Content
t := strings.Split(first, "][")
eg := ""
if len(t) >= 3 {
eg = t[2]
}
path, err := api.LinkToSMS(content) //API组件调用links
sms := ""
if err != nil || path == "" {
sms = fmt.Sprintf("[P%d][%s] %d %s. e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
log.Error("get short link fail", err)
} else {
sms = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path) //多条合并带连接地址
log.Debugf("combined sms is:%s", sms)
}
redi.WriteSms([]string{arr[0].Phone}, sms) //SMS重入队"/sms" }
}
### 所有SMS内容出队
func popAllSmsDto() []*SmsDto {
ret := []*SmsDto{}
queue := g.Config().Redis.UserSmsQueue //队列"/queue/user/sms"
rc := g.RedisConnPool.Get() //redis client连接
defer rc.Close()
for {
reply, err := redis.String(rc.Do("RPOP", queue)) //RPOP
if err != nil {
if err != redis.ErrNil {
log.Error("get SmsDto fail", err)
}
break
}
if reply == "" || reply == "nil" {
continue
}
var smsDto SmsDto
err = json.Unmarshal([]byte(reply), &smsDto) //JSON反序列化为Sms结构体
if err != nil {
log.Errorf("json unmarshal SmsDto: %s fail: %v", reply, err)
continue
}
ret = append(ret, &smsDto) //slice保存
}
return ret //返回slice
}
cron.CombineMail() 合并MAIL内容(同上SMS)
func CombineMail() {
for {
// 每分钟读取处理一次
time.Sleep(time.Minute)
combineMail()
}
}
func combineMail() {
dtos := popAllMailDto() //Mail队列"/queue/user/mail" RPOP所有事件邮件内容信息
count := len(dtos)
if count == 0 {
return
}
dtoMap := make(map[string][]*MailDto)
for i := 0; i < count; i++ {
key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].Email, dtos[i].Metric)
if _, ok := dtoMap[key]; ok {
dtoMap[key] = append(dtoMap[key], dtos[i])
} else {
dtoMap[key] = []*MailDto{dtos[i]}
}
}
// 不要在这处理,继续写回redis,否则重启alarm很容易丢数据
for _, arr := range dtoMap {
size := len(arr)
if size == 1 {
redi.WriteMail([]string{arr[0].Email}, arr[0].Subject, arr[0].Content)
continue
}
subject := fmt.Sprintf("[P%d][%s] %d %s", arr[0].Priority, arr[0].Status, size, arr[0].Metric)
contentArr := make([]string, size)
for i := 0; i < size; i++ {
contentArr[i] = arr[i].Content
}
content := strings.Join(contentArr, "\r\n")
log.Debugf("combined mail subject:%s, content:%s", subject, content)
redi.WriteMail([]string{arr[0].Email}, subject, content)//mail重入队"/mail"
}
}
cron.CombineIM() 合并IM内容(同上SMS)
func CombineIM() {
for {
// 每分钟读取处理一次
time.Sleep(time.Minute)
combineIM()
}
}
func combineIM() {
dtos := popAllImDto() //IM队列"/queue/user/im" RPOP所有事件IM内容信息
count := len(dtos)
if count == 0 {
return
}
dtoMap := make(map[string][]*ImDto)
for i := 0; i < count; i++ {
key := fmt.Sprintf("%d%s%s%s", dtos[i].Priority, dtos[i].Status, dtos[i].IM, dtos[i].Metric)
if _, ok := dtoMap[key]; ok {
dtoMap[key] = append(dtoMap[key], dtos[i])
} else {
dtoMap[key] = []*ImDto{dtos[i]}
}
}
for _, arr := range dtoMap {
size := len(arr)
if size == 1 {
redi.WriteIM([]string{arr[0].IM}, arr[0].Content)
continue
}
// 把多个im内容写入数据库,只给用户提供一个链接
contentArr := make([]string, size)
for i := 0; i < size; i++ {
contentArr[i] = arr[i].Content
}
content := strings.Join(contentArr, ",,")
first := arr[0].Content
t := strings.Split(first, "][")
eg := ""
if len(t) >= 3 {
eg = t[2]
}
path, err := api.LinkToSMS(content)
chat := ""
if err != nil || path == "" {
chat = fmt.Sprintf("[P%d][%s] %d %s. e.g. %s. detail in email", arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg)
log.Error("create short link fail", err)
} else {
chat = fmt.Sprintf("[P%d][%s] %d %s e.g. %s %s/portal/links/%s ",
arr[0].Priority, arr[0].Status, size, arr[0].Metric, eg, g.Config().Api.Dashboard, path)
log.Debugf("combined im is:%s", chat)
}
redi.WriteIM([]string{arr[0].IM}, chat) //im重入队"/im"
}
}
cron.ConsumeSms() 发送SMS事件通知
# 发送SMS事件通知入口函数
func ConsumeSms() {
for {
L := redi.PopAllSms()
if len(L) == 0 {
time.Sleep(time.Millisecond * 200)
continue
}
SendSmsList(L)
}
}
## Pop所有'/sms'队列信息
func PopAllSms() []*model.Sms {
ret := []*model.Sms{}
queue := SMS_QUEUE_NAME //"/sms"队列
rc := g.RedisConnPool.Get() //redis client连接池
defer rc.Close()
for {
reply, err := redis.String(rc.Do("RPOP", queue)) //pop
if err != nil {
if err != redis.ErrNil {
log.Error(err)
}
break
}
if reply == "" || reply == "nil" {
continue
}
var sms model.Sms
err = json.Unmarshal([]byte(reply), &sms) //json反序列为结构
if err != nil {
log.Error(err, reply)
continue
}
ret = append(ret, &sms) //slice sms
}
return ret //返回slice
}
## 处理SMS发送
func SendSmsList(L []*model.Sms) {
for _, sms := range L {
SmsWorkerChan <- 1 //sms工作channel
go SendSms(sms) //并发线程处理发送SMS
}
}
###发送SMS实现函数
func SendSms(sms *model.Sms) {
defer func() {
<-SmsWorkerChan
}()
url := g.Config().Api.Sms
r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second) //HTTP POST请求
r.Param("tos", sms.Tos) //HTTP 请求参数tos
r.Param("content", sms.Content) //HTTP 请求参数content
resp, err := r.String()
if err != nil {
log.Errorf("send sms fail, tos:%s, cotent:%s, error:%v", sms.Tos, sms.Content, err)
}
log.Debugf("send sms:%v, resp:%v, url:%s", sms, resp, url)
}
cron.ConsumeIM() 发送IM事件通知(同上SMS)
func ConsumeIM() {
for {
L := redi.PopAllIM() //pop all
if len(L) == 0 {
time.Sleep(time.Millisecond * 200)
continue
}
SendIMList(L) //send
}
}
func SendIMList(L []*model.IM) {
for _, im := range L {
IMWorkerChan <- 1
go SendIM(im)
}
}
func SendIM(im *model.IM) {
defer func() {
<-IMWorkerChan
}()
url := g.Config().Api.IM
r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
r.Param("tos", im.Tos)
r.Param("content", im.Content)
resp, err := r.String()
if err != nil {
log.Errorf("send im fail, tos:%s, cotent:%s, error:%v", im.Tos, im.Content, err)
}
log.Debugf("send im:%v, resp:%v, url:%s", im, resp, url)
}
cron.ConsumeMail() 发送MAIL事件通知(同上SMS)
func ConsumeMail() {
for {
L := redi.PopAllMail() //pop all
if len(L) == 0 {
time.Sleep(time.Millisecond * 200)
continue
}
SendMailList(L) //send
}
}
func SendMailList(L []*model.Mail) {
for _, mail := range L {
MailWorkerChan <- 1
go SendMail(mail)
}
}
func SendMail(mail *model.Mail) {
defer func() {
<-MailWorkerChan
}()
url := g.Config().Api.Mail
r := httplib.Post(url).SetTimeout(5*time.Second, 30*time.Second)
r.Param("tos", mail.Tos)
r.Param("subject", mail.Subject)
r.Param("content", mail.Content)
resp, err := r.String()
if err != nil {
log.Errorf("send mail fail, receiver:%s, subject:%s, cotent:%s, error:%v", mail.Tos, mail.Subject, mail.Content, err)
}
log.Debugf("send mail:%v, resp:%v, url:%s", mail, resp, url)
}
cron.CleanExpiredEvent() //清理过旧的事件信息
func CleanExpiredEvent() {
for {
retention_days := g.Config().Housekeeper.EventRetentionDays //内存保留event,默认7day
delete_batch := g.Config().Housekeeper.EventDeleteBatch //批量清理events条数,默认100
now := time.Now()
before := now.Add(time.Duration(-retention_days*24) * time.Hour)
eventmodel.DeleteEventOlder(before, delete_batch) //清理处理
time.Sleep(time.Second * 60)
}
}
//清理数据库事件信息
func DeleteEventOlder(before time.Time, limit int) {
t := before.Format(timeLayout)
sqlTpl := `delete from events where timestamp<? limit ?`
q := orm.NewOrm()
resp, err := q.Raw(sqlTpl, t, limit).Exec()
if err != nil {
log.Errorf("delete event older than %v fail, error:%v", t, err)
} else {
affected, _ := resp.RowsAffected()
log.Debugf("delete event older than %v, rows affected:%v", t, affected)
}
}
技术经验借鉴
- 报警合并机制实现
- 事件异步化和列队实现
扩展学习
网友评论