
Executor总述:
Executor 是对告警进行增删查改和触发告警的最小执行单元;
Executor 构成:
整体由功能划分为四个模块
- Alert Runner :加载MySQL数据库里 Alert 表信息,生成Alert Runner;( Alert Runner 详解 : https://www.jianshu.com/p/43887f31769a )
- Alive Reporter :读取etcd的信息,检查 Alert Runner是否还存在;往etcd 不断发送消息,维持心跳;
- HealthChecker :检查Alert Runner,把 wildRunners(不符合要求的)给Terminate掉;
- Broadcast Receiver :读取etcd的信息,监控资源的变动信息,根据变动 Operation 而操作;
Executor监控模块
从 /cmd/alert/main.go 的 main 函数开始
- cfg := config.GetInstance().LoadConf() 先加载全局配置;
- 从 mainFuncExecutor() 进入 executor的初始化、开始心跳、服务(对应 Init(host) /HeartBoot() / Serve()) ;注意第一次给etcd发送心跳是heartboot不是heartbeat ;
- 初始化 executor:executor 先初始化四个模块( alertReceiver / aliveReporter / broadcastReceiver / healthChecker ),然后再装载进 executor 中;
- 初始化 executor 第一次心跳:调用 aliveReporter.HeartBoot() 方法给 etcd 发送心跳。
- 开始服务:调用 alertReceiver.serve()、broadcastReceiver.WatchBroadcast()、healthChecker.HealthCheck()、aliveReporter.HeartBeat();
/cmd/alert/main.go
func mainFuncExecutor() {
host, err := os.Hostname()
if err != nil {
logger.Error(nil, "Get Host Name error: %s", err)
return
}
e = executor.Init(host)
//if e.CheckExist() {
// logger.Error(nil, "Executor ID already used, exiting...")
// syscall.Kill(os.Getpid(), syscall.SIGHUP)
//}
e.HeartBoot()
e.Serve()
}
func main() {
exitHandler()
cfg := config.GetInstance().LoadConf()
switch cfg.App.RunMode {
case "executor":
mainFuncExecutor()
case "watcher":
mainFuncWatcher()
case "manager":
mainFuncManager()
case "client":
mainFuncClient()
default:
logger.Error(nil, "Run mode error, exiting...")
}
}
/pkg/services/executor/executor.go
func Init(name string) *Executor {
alertReceiver := NewAlertReceiver()
aliveReporter := NewAliveReporter()
broadcastReceiver := NewBroadcastReceiver()
healthChecker := NewHealthChecker()
executor := NewExecutor(name, alertReceiver, aliveReporter, broadcastReceiver, healthChecker)
alertReceiver.SetExecutor(executor)
aliveReporter.SetExecutor(executor)
broadcastReceiver.SetExecutor(executor)
healthChecker.SetExecutor(executor)
return executor
}
func (e *Executor) HeartBoot() {
e.aliveReporter.HeartBoot()
}
func (e *Executor) Serve() {
go e.alertReceiver.Serve()
go e.broadcastReceiver.WatchBroadcast()
go e.healthChecker.HealthCheck()
e.aliveReporter.HeartBeat()
}
/pkg/services/executor/alive_reporter.go
type AliveReporter struct {
executor *Executor
}
func NewAliveReporter() *AliveReporter {
ar := &AliveReporter{}
return ar
}
func (ar *AliveReporter) SetExecutor(executor *Executor) {
ar.executor = executor
}
func (ar *AliveReporter) putKey(expireTime int64) error {
ctx := context.Background()
e := global.GetInstance().GetEtcd()
key := "alert-executors/" + ar.executor.GetName()
info := &ExecutorInfo{
Name: ar.executor.GetName(),
TaskCount: ar.executor.GetTaskCount(),
}
value, _ := json.Marshal(info)
resp, err := e.Grant(ctx, expireTime)
if err != nil {
logger.Error(nil, "Grant TTL from etcd failed: %+v", err)
return err
}
_, err = e.Put(ctx, key, string(value), clientv3.WithLease(resp.ID))
if err != nil {
logger.Error(nil, "AliveReporter putKey [%s] [%s] to etcd failed: %+v", key, string(value), err)
return err
}
return err
}
func (ar *AliveReporter) HeartBoot() {
err := ar.putKey(30)
if err != nil {
logger.Error(nil, "AliveReporter HeartBoot failed: %+v", err)
}
}
网友评论