美文网首页
Kubersphere Executor 模块源码学习总结

Kubersphere Executor 模块源码学习总结

作者: 七齐起器 | 来源:发表于2021-03-09 16:41 被阅读0次
整体框架图.png

Executor总述:

Executor 是对告警进行增删查改和触发告警的最小执行单元;

Executor 构成:

整体由功能划分为四个模块

  • Alert Runner :加载MySQL数据库里 Alert 表信息,生成Alert Runner;( Alert Runner 详解 : https://www.jianshu.com/p/43887f31769a )
  • Alive Reporter :读取etcd的信息,检查 Alert Runner是否还存在;往etcd 不断发送消息,维持心跳;
  • HealthChecker :检查Alert Runner,把 wildRunners(不符合要求的)给Terminate掉;
  • Broadcast Receiver :读取etcd的信息,监控资源的变动信息,根据变动 Operation 而操作;

Executor监控模块

从 /cmd/alert/main.go 的 main 函数开始

  • cfg := config.GetInstance().LoadConf() 先加载全局配置;
  • 从 mainFuncExecutor() 进入 executor的初始化、开始心跳、服务(对应 Init(host) /HeartBoot() / Serve()) ;注意第一次给etcd发送心跳是heartboot不是heartbeat ;
  • 初始化 executor:executor 先初始化四个模块( alertReceiver / aliveReporter / broadcastReceiver / healthChecker ),然后再装载进 executor 中;
  • 初始化 executor 第一次心跳:调用 aliveReporter.HeartBoot() 方法给 etcd 发送心跳。
  • 开始服务:调用 alertReceiver.serve()、broadcastReceiver.WatchBroadcast()、healthChecker.HealthCheck()、aliveReporter.HeartBeat();

/cmd/alert/main.go

func mainFuncExecutor() {

    host, err := os.Hostname()
    if err != nil {
        logger.Error(nil, "Get Host Name error: %s", err)
        return
    }

    e = executor.Init(host)

    //if e.CheckExist() {
    //  logger.Error(nil, "Executor ID already used, exiting...")
    //  syscall.Kill(os.Getpid(), syscall.SIGHUP)
    //}
    e.HeartBoot()
    e.Serve()
}

func main() {
    exitHandler()
    cfg := config.GetInstance().LoadConf()
    switch cfg.App.RunMode {
    case "executor":
        mainFuncExecutor()
    case "watcher":
        mainFuncWatcher()
    case "manager":
        mainFuncManager()
    case "client":
        mainFuncClient()
    default:
        logger.Error(nil, "Run mode error, exiting...")
    }
}

/pkg/services/executor/executor.go

func Init(name string) *Executor {
    alertReceiver := NewAlertReceiver()
    aliveReporter := NewAliveReporter()
    broadcastReceiver := NewBroadcastReceiver()
    healthChecker := NewHealthChecker()
    executor := NewExecutor(name, alertReceiver, aliveReporter, broadcastReceiver, healthChecker)

    alertReceiver.SetExecutor(executor)
    aliveReporter.SetExecutor(executor)
    broadcastReceiver.SetExecutor(executor)
    healthChecker.SetExecutor(executor)

    return executor
}

func (e *Executor) HeartBoot() {
    e.aliveReporter.HeartBoot()
}

func (e *Executor) Serve() {
    go e.alertReceiver.Serve()
    go e.broadcastReceiver.WatchBroadcast()
    go e.healthChecker.HealthCheck()
    e.aliveReporter.HeartBeat()
}

/pkg/services/executor/alive_reporter.go

type AliveReporter struct {
    executor *Executor
}

func NewAliveReporter() *AliveReporter {
    ar := &AliveReporter{}

    return ar
}

func (ar *AliveReporter) SetExecutor(executor *Executor) {
    ar.executor = executor
}

func (ar *AliveReporter) putKey(expireTime int64) error {
    ctx := context.Background()
    e := global.GetInstance().GetEtcd()

    key := "alert-executors/" + ar.executor.GetName()

    info := &ExecutorInfo{
        Name:      ar.executor.GetName(),
        TaskCount: ar.executor.GetTaskCount(),
    }

    value, _ := json.Marshal(info)

    resp, err := e.Grant(ctx, expireTime)
    if err != nil {
        logger.Error(nil, "Grant TTL from etcd failed: %+v", err)
        return err
    }

    _, err = e.Put(ctx, key, string(value), clientv3.WithLease(resp.ID))

    if err != nil {
        logger.Error(nil, "AliveReporter putKey [%s] [%s] to etcd failed: %+v", key, string(value), err)
        return err
    }

    return err
}

func (ar *AliveReporter) HeartBoot() {
    err := ar.putKey(30)

    if err != nil {
        logger.Error(nil, "AliveReporter HeartBoot failed: %+v", err)
    }
}

相关文章

网友评论

      本文标题:Kubersphere Executor 模块源码学习总结

      本文链接:https://www.haomeiwen.com/subject/jxxbqltx.html