美文网首页
Kubersphere Executor 模块 Alert Ru

Kubersphere Executor 模块 Alert Ru

作者: 七齐起器 | 来源:发表于2021-03-10 09:31 被阅读0次
整体框架图

Executor 模块中调用 alertReceiver.serve(),Alert Runner 就开始工作;它做了以下事情:

  • 首先,使用 ar.ExtractAlerts(),将 Alert 信息全部取出放到队列 runningAlertIds 中;
  • 从队列中取出 一个 Alert,通过ar.HandleAlert 调用 ar.executor.AddAlert 来添加新的Alert ;
  • 可以从 executor 中一个 AddAlert 会调用一个 startRunner ;runner 实际上是一个互斥锁;
  • startRunner 它先检查 alert 的存在性,若不存在则进行创建 alert ,创建完毕以后,调用 runner.Run(initStatus) 启动 runner ,同时更新 alert 状态为 running;

/pkg/services/executor/alert_receiver.go

type AlertReceiver struct {
    alertQueue      lib.Topic
    runningAlertIds chan string
    executor        *Executor
}

func NewAlertReceiver() *AlertReceiver {
    cfg := config.GetInstance()
    queueConnStr := cfg.Queue.Addr
    queueType := cfg.Queue.Type

    queueConfigMap := map[string]interface{}{
        "connStr": queueConnStr,
    }

    var alertQueue lib.Topic

    c, err := q.New(queueType, queueConfigMap)
    if err != nil {
        logger.Error(nil, "Failed to connect redis queue: %+v.", err)
    }

    alertQueue, _ = c.SetTopic(constants.AlertTopicPrefix)

    return &AlertReceiver{
        alertQueue:      alertQueue,
        runningAlertIds: make(chan string, 1000),
    }
}

func (ar *AlertReceiver) SetExecutor(executor *Executor) {
    ar.executor = executor
}

func (ar *AlertReceiver) Serve() {
    go ar.ExtractAlerts()

    for i := 0; i < constants.MaxWorkingAlerts; i++ {
        go ar.HandleAlert(strconv.Itoa(i))
    }
}

func (ar *AlertReceiver) ExtractAlerts() error {
    if ar.alertQueue == nil {
        return errors.New("AlertQueue not initialized")
    }

    for {
        alertId, err := ar.alertQueue.Dequeue()
        if err != nil {
            logger.Error(nil, "AlertReceiver failed to test for debug")
            logger.Error(nil, "AlertReceiver failed to dequeue alert from etcd queue: %+v", err)
            time.Sleep(3 * time.Second)
            continue
        }

        logger.Debug(nil, "AlertReceiver dequeue alert [%s] from etcd queue succeed", alertId)
        ar.runningAlertIds <- alertId
    }
}

func (ar *AlertReceiver) HandleAlert(handlerNum string) {
    for {
        alertId := <-ar.runningAlertIds
        logger.Debug(nil, "AlertReceiver handle alert [%s] from etcd queue", alertId)
        ar.executor.AddAlert(alertId)
    }
}

/pkg/services/executor/executor.go

type Runner struct {
    sync.RWMutex
    Map map[string]*AlertRunner
}

func NewExecutor(name string, alertReceiver *AlertReceiver, aliveReporter *AliveReporter, broadcastReceiver *BroadcastReceiver, healthChecker *HealthChecker) *Executor {
    e := &Executor{
        name:              name,
        alertReceiver:     alertReceiver,
        runner:            &Runner{Map: make(map[string]*AlertRunner)},
        aliveReporter:     aliveReporter,
        broadcastReceiver: broadcastReceiver,
        healthChecker:     healthChecker,
    }
    return e
}

func (e *Executor) AddAlert(alertId string) {
    e.startRunner(alertId)
}

func (e *Executor) startRunner(alertId string) bool {
    e.runner.Lock()
    _, ok := e.runner.Map[alertId]
    e.runner.Unlock()
    if ok {
        logger.Error(nil, "Executor startRunner error: alert %s already exists", alertId)
        return false
    }

    //Query DB, check if this alert is in adding or migrating state
    alert := rs.GetAlertInfo(alertId)
    if !((alert.RunningStatus == "adding" || alert.RunningStatus == "migrating") && alert.ExecutorId == "") {
        logger.Error(nil, "Executor startRunner error: alert %s should not be dispatched", alertId)
        return false
    }

    initStatus := alert.RunningStatus

    //Update DB, set this alert to running
    err := rs.UpdateAlertInfo(alertId, e.name, "running")

    if err != nil {
        logger.Error(nil, "Executor startRunner error: update alert "+alertId+" error")
        return false
    }

    var runner = NewAlertRunner(alertId, e.healthChecker.UpdateCh)

    e.runner.Lock()
    e.runner.Map[alertId] = runner
    e.runner.Unlock()

    go runner.Run(initStatus)

    logger.Debug(nil, "Executor startRunner "+alertId+" success")

    return true
}

/pkg/services/executor/runner.go

func (ar *AlertRunner) loadAlertInfo() {
    alertDetail := rs.QueryAlertDetail(ar.AlertConfig.AlertId)

    //1. Parse Resource
    ar.AlertConfig.RsTypeName = alertDetail.RsTypeName
    ar.AlertConfig.RsTypeParam = alertDetail.RsTypeParam
    ar.AlertConfig.RsFilterName = alertDetail.RsFilterName
    ar.AlertConfig.RsFilterParam = alertDetail.RsFilterParam

    //2. Parse Notification
    ar.parseNotification(alertDetail.NfAddressListId)

    //3. Parse policy config
    ar.parsePolicyConfig(alertDetail)

    //4. Parse Rules config
    ar.parseRules()

    //5. Parse Alert status
    ar.parseAlertConfigStatus(alertDetail)

    logger.Debug(nil, "loadAlertInfo alert: %v", ar)
}

func (ar *AlertRunner) Run(initStatus string) {
    timer := time.NewTicker(time.Second * TickPeriodSecond)
    defer timer.Stop()

    ar.loadAlertInfo()
    ar.updateAlertUpdateTime()

    //If get alert from migrating, continue running with current status, only need to reset status when adding and updating
    if initStatus == "adding" {
        ar.AlertStatus.Lock()
        ar.resetAlertStatus()
        ar.AlertStatus.Unlock()
    }

    for {
    
        select {
        
        case <-timer.C:
        
            ar.runAlertRules()
            logger.Debug(nil, "AlertRunner alert %s run", ar.AlertConfig.AlertId)
            ar.updateAlertUpdateTime()
        case operation := <-ar.SignalCh:
            switch operation {
            case "Stop":
                //Drain SignalCh
                for len(ar.SignalCh) > 0 {
                    <-ar.SignalCh
                }
                logger.Debug(nil, "AlertRunner alert %s stop", ar.AlertConfig.AlertId)
                return
            case "Update":
                
                ar.loadAlertInfo()
                ar.AlertStatus.Lock()
                ar.resetAlertStatus()
                ar.AlertStatus.Unlock()
                ar.updateAlertUpdateTime()
                logger.Debug(nil, "AlertRunner alert %s update", ar.AlertConfig.AlertId)
            default:
                param := strings.Split(operation, " ")
                if len(param) != 2 {
                    break
                }
                switch param[0] {
                case "Comment":
                    ar.commentAlert(param[1])
                    ar.updateAlertUpdateTime()
                    logger.Debug(nil, "AlertRunner alert %s comment", ar.AlertConfig.AlertId)
                }
            }
        }
    }
}

相关文章

网友评论

      本文标题:Kubersphere Executor 模块 Alert Ru

      本文链接:https://www.haomeiwen.com/subject/vljbqltx.html