美文网首页
prometheus告警相关代码

prometheus告警相关代码

作者: wwq2020 | 来源:发表于2021-11-08 16:54 被阅读0次

    prometheus中

    cmd/prometheus/main.go中

    func main() {
    ...
    queryEngine = promql.NewEngine(opts)
    ...
        localStorage  = &readyStorage{stats: tsdb.NewDBStats()}
        scraper       = &readyScrapeManager{}
        remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer,localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper)
        fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage)
    ...
        ruleManager = rules.NewManager(&rules.ManagerOptions{
                Appendable:      fanoutStorage,
                Queryable:       localStorage,
                QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),
                NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
                Context:         ctxRule,
                ExternalURL:     cfg.web.ExternalURL,
                Registerer:      prometheus.DefaultRegisterer,
                Logger:          log.With(logger, "component", "rule manager"),
                OutageTolerance: time.Duration(cfg.outageTolerance),
                ForGracePeriod:  time.Duration(cfg.forGracePeriod),
                ResendDelay:     time.Duration(cfg.resendDelay),
        })
    ...
        return ruleManager.Update(
                        time.Duration(cfg.GlobalConfig.EvaluationInterval),
                        files,
                        cfg.GlobalConfig.ExternalLabels,
                        externalURL,
        )
    ...
        ruleManager = rules.NewManager(&rules.ManagerOptions{
                Appendable:      fanoutStorage,
                Queryable:       localStorage,
                QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),
                NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
                Context:         ctxRule,
                ExternalURL:     cfg.web.ExternalURL,
                Registerer:      prometheus.DefaultRegisterer,
                Logger:          log.With(logger, "component", "rule manager"),
                OutageTolerance: time.Duration(cfg.outageTolerance),
                ForGracePeriod:  time.Duration(cfg.forGracePeriod),
                ResendDelay:     time.Duration(cfg.resendDelay),
        })
        ...
            ruleManager.Run()
        ...
        err := discoveryManagerNotify.Run()
        ...
        notifierManager.Run(discoveryManagerNotify.SyncCh())
        ...
    }
    
    func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
        return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
            var res []*notifier.Alert
    
            for _, alert := range alerts {
                a := &notifier.Alert{
                    StartsAt:     alert.FiredAt,
                    Labels:       alert.Labels,
                    Annotations:  alert.Annotations,
                    GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
                }
                if !alert.ResolvedAt.IsZero() {
                    a.EndsAt = alert.ResolvedAt
                } else {
                    a.EndsAt = alert.ValidUntil
                }
                res = append(res, a)
            }
    
            if len(alerts) > 0 {
                s.Send(res...)
            }
        }
    }
    

    notifier/notifier.go中

    func (n *Manager) nextBatch() []*Alert {
        n.mtx.Lock()
        defer n.mtx.Unlock()
    
        var alerts []*Alert
    
        if len(n.queue) > maxBatchSize {
            alerts = append(make([]*Alert, 0, maxBatchSize), n.queue[:maxBatchSize]...)
            n.queue = n.queue[maxBatchSize:]
        } else {
            alerts = append(make([]*Alert, 0, len(n.queue)), n.queue...)
            n.queue = n.queue[:0]
        }
    
        return alerts
    }
    
    
    
    // Run dispatches notifications continuously.
    func (n *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) {
    
        for {
            select {
            case <-n.ctx.Done():
                return
            case ts := <-tsets:
                n.reload(ts)
            case <-n.more:
            }
            alerts := n.nextBatch()
    
            if !n.sendAll(alerts...) {
                n.metrics.dropped.Add(float64(len(alerts)))
            }
            // If the queue still has items left, kick off the next iteration.
            if n.queueLen() > 0 {
                n.setMore()
            }
        }
    }
    
    func (n *Manager) reload(tgs map[string][]*targetgroup.Group) {
        n.mtx.Lock()
        defer n.mtx.Unlock()
    
        for id, tgroup := range tgs {
            am, ok := n.alertmanagers[id]
            if !ok {
                level.Error(n.logger).Log("msg", "couldn't sync alert manager set", "err", fmt.Sprintf("invalid id:%v", id))
                continue
            }
            am.sync(tgroup)
        }
    }
    
    func (s *alertmanagerSet) sync(tgs []*targetgroup.Group) {
        allAms := []alertmanager{}
        allDroppedAms := []alertmanager{}
    
        for _, tg := range tgs {
            ams, droppedAms, err := alertmanagerFromGroup(tg, s.cfg)
            if err != nil {
                level.Error(s.logger).Log("msg", "Creating discovered Alertmanagers failed", "err", err)
                continue
            }
            allAms = append(allAms, ams...)
            allDroppedAms = append(allDroppedAms, droppedAms...)
        }
    
        s.mtx.Lock()
        defer s.mtx.Unlock()
        // Set new Alertmanagers and deduplicate them along their unique URL.
        s.ams = []alertmanager{}
        s.droppedAms = []alertmanager{}
        s.droppedAms = append(s.droppedAms, allDroppedAms...)
        seen := map[string]struct{}{}
    
        for _, am := range allAms {
            us := am.url().String()
            if _, ok := seen[us]; ok {
                continue
            }
    
            // This will initialize the Counters for the AM to 0.
            s.metrics.sent.WithLabelValues(us)
            s.metrics.errors.WithLabelValues(us)
    
            seen[us] = struct{}{}
            s.ams = append(s.ams, am)
        }
    }
    // Send queues the given notification requests for processing.
    // Panics if called on a handler that is not running.
    func (n *Manager) Send(alerts ...*Alert) {
        n.mtx.Lock()
        defer n.mtx.Unlock()
    
        // Attach external labels before relabelling and sending.
        for _, a := range alerts {
            lb := labels.NewBuilder(a.Labels)
    
            for _, l := range n.opts.ExternalLabels {
                if a.Labels.Get(l.Name) == "" {
                    lb.Set(l.Name, l.Value)
                }
            }
    
            a.Labels = lb.Labels()
        }
    
        alerts = n.relabelAlerts(alerts)
        if len(alerts) == 0 {
            return
        }
    
        // Queue capacity should be significantly larger than a single alert
        // batch could be.
        if d := len(alerts) - n.opts.QueueCapacity; d > 0 {
            alerts = alerts[d:]
    
            level.Warn(n.logger).Log("msg", "Alert batch larger than queue capacity, dropping alerts", "num_dropped", d)
            n.metrics.dropped.Add(float64(d))
        }
    
        // If the queue is full, remove the oldest alerts in favor
        // of newer ones.
        if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
            n.queue = n.queue[d:]
    
            level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
            n.metrics.dropped.Add(float64(d))
        }
        n.queue = append(n.queue, alerts...)
    
        // Notify sending goroutine that there are alerts to be processed.
        n.setMore()
    }
    
    
    // setMore signals that the alert queue has items.
    func (n *Manager) setMore() {
        // If we cannot send on the channel, it means the signal already exists
        // and has not been consumed yet.
        select {
        case n.more <- struct{}{}:
        default:
        }
    }
    
    
    
    // sendAll sends the alerts to all configured Alertmanagers concurrently.
    // It returns true if the alerts could be sent successfully to at least one Alertmanager.
    func (n *Manager) sendAll(alerts ...*Alert) bool {
        if len(alerts) == 0 {
            return true
        }
    
        begin := time.Now()
    
        // v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
        // v1 or v2. Marshaling happens below. Reference here is for caching between
        // for loop iterations.
        var v1Payload, v2Payload []byte
    
        n.mtx.RLock()
        amSets := n.alertmanagers
        n.mtx.RUnlock()
    
        var (
            wg         sync.WaitGroup
            numSuccess atomic.Uint64
        )
        for _, ams := range amSets {
            var (
                payload []byte
                err     error
            )
    
            ams.mtx.RLock()
    
            switch ams.cfg.APIVersion {
            case config.AlertmanagerAPIVersionV1:
                {
                    if v1Payload == nil {
                        v1Payload, err = json.Marshal(alerts)
                        if err != nil {
                            level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v1 failed", "err", err)
                            ams.mtx.RUnlock()
                            return false
                        }
                    }
    
                    payload = v1Payload
                }
            case config.AlertmanagerAPIVersionV2:
                {
                    if v2Payload == nil {
                        openAPIAlerts := alertsToOpenAPIAlerts(alerts)
    
                        v2Payload, err = json.Marshal(openAPIAlerts)
                        if err != nil {
                            level.Error(n.logger).Log("msg", "Encoding alerts for Alertmanager API v2 failed", "err", err)
                            ams.mtx.RUnlock()
                            return false
                        }
                    }
    
                    payload = v2Payload
                }
            default:
                {
                    level.Error(n.logger).Log(
                        "msg", fmt.Sprintf("Invalid Alertmanager API version '%v', expected one of '%v'", ams.cfg.APIVersion, config.SupportedAlertmanagerAPIVersions),
                        "err", err,
                    )
                    ams.mtx.RUnlock()
                    return false
                }
            }
    
            for _, am := range ams.ams {
                wg.Add(1)
    
                ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
                defer cancel()
    
                go func(client *http.Client, url string) {
                    if err := n.sendOne(ctx, client, url, payload); err != nil {
                        level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
                        n.metrics.errors.WithLabelValues(url).Inc()
                    } else {
                        numSuccess.Inc()
                    }
                    n.metrics.latency.WithLabelValues(url).Observe(time.Since(begin).Seconds())
                    n.metrics.sent.WithLabelValues(url).Add(float64(len(alerts)))
    
                    wg.Done()
                }(ams.client, am.url().String())
            }
    
            ams.mtx.RUnlock()
        }
    
        wg.Wait()
    
        return numSuccess.Load() > 0
    }
    
    func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
        req, err := http.NewRequest("POST", url, bytes.NewReader(b))
        if err != nil {
            return err
        }
        req.Header.Set("User-Agent", userAgent)
        req.Header.Set("Content-Type", contentTypeJSON)
        resp, err := n.opts.Do(ctx, c, req)
        if err != nil {
            return err
        }
        defer func() {
            io.Copy(ioutil.Discard, resp.Body)
            resp.Body.Close()
        }()
    
        // Any HTTP status 2xx is OK.
        if resp.StatusCode/100 != 2 {
            return errors.Errorf("bad response status %s", resp.Status)
        }
    
        return nil
    }
    
    

    rules/manager.go中

    func (m *Manager) Run() {
        m.start()
        <-m.done
    }
    
    func (m *Manager) start() {
        close(m.block)
    }
    
    func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
        ...
        groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
        ....
        var wg sync.WaitGroup
        for _, newg := range groups {
            ...
            newg.run(m.opts.Context)
            ...
        }
        ...
    }
    
    func (m *Manager) LoadGroups(
        interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string,
    ) (map[string]*Group, []error) {
    ...
                groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{
                    Name:          rg.Name,
                    File:          fn,
                    Interval:      itv,
                    Limit:         rg.Limit,
                    Rules:         rules,
                    ShouldRestore: shouldRestore,
                    Opts:          m.opts,
                    done:          m.done,
                })
    ...
    }
    
    func NewGroup(o GroupOptions) *Group {
        return &Group{
            name:                 o.Name,
            file:                 o.File,
            interval:             o.Interval,
            limit:                o.Limit,
            rules:                o.Rules,
            shouldRestore:        o.ShouldRestore,
            opts:                 o.Opts,
            seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
            done:                 make(chan struct{}),
            managerDone:          o.done,
            terminated:           make(chan struct{}),
            logger:               log.With(o.Opts.Logger, "group", o.Name),
            metrics:              metrics,
        }  
    }
    
    func (g *Group) run(ctx context.Context) {
        ...
        iter := func() {
            g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()
    
            start := time.Now()
            g.Eval(ctx, evalTimestamp)
            timeSinceStart := time.Since(start)
    
            g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
            g.setEvaluationTime(timeSinceStart)
            g.setLastEvaluation(start)
        }
        ...
        for {
            select {
            case <-g.done:
                return
            default:
                select {
                case <-g.done:
                    return
                case <-tick.C:
                    ...
                    iter()
                }
            }
        }
        ...
    }
    
    func (g *Group) Eval(ctx context.Context, ts time.Time) {
        ...
        for i, rule := range g.rules {
            vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
            ...
            if ar, ok := rule.(*AlertingRule); ok {
                    ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
            }
            ...
        }
    }
    

    rules/alerting.go中

    func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
        alerts := []*Alert{}
        r.ForEachActiveAlert(func(alert *Alert) {
            if alert.needsSending(ts, resendDelay) {
                alert.LastSentAt = ts
                // Allow for two Eval or Alertmanager send failures.
                delta := resendDelay
                if interval > resendDelay {
                    delta = interval
                }
                alert.ValidUntil = ts.Add(4 * delta)
                anew := *alert
                alerts = append(alerts, &anew)
            }
        })
        notifyFunc(ctx, r.vector.String(), alerts...)
    }
    

    alertmanager中

    cmd/alertmanager/main.go中

    func main() {
        os.Exit(run())
    }
    
    func run() int {
        ...
        alerts, err := mem.NewAlerts(context.Background(), marker, *alertGCInterval, nil, logger)
        if err != nil {
            level.Error(logger).Log("err", err)
            return 1
        }
        ...
        api, err := api.New(api.Options{
            Alerts:      alerts,
            Silences:    silences,
            StatusFunc:  marker.Status,
            Peer:        clusterPeer,
            Timeout:     *httpTimeout,
            Concurrency: *getConcurrency,
            Logger:      log.With(logger, "component", "api"),
            Registry:    prometheus.DefaultRegisterer,
            GroupFunc:   groupFn,
        })
        ...
        inhibitor = inhibit.NewInhibitor(alerts, conf.InhibitRules, marker, logger)
        silencer := silence.NewSilencer(silences, marker, logger)
        ...
        pipeline := pipelineBuilder.New(
                receivers,
                waitFunc,
                inhibitor,
                silencer,
                muteTimes,
                notificationLog,
                pipelinePeer,
        )
        disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
        ...
        go disp.Run()
        go inhibitor.Run()
        ...
    }
    

    dispatch/dispatch.go中

    func NewDispatcher(
        ap provider.Alerts,
        r *Route,
        s notify.Stage,
        mk types.Marker,
        to func(time.Duration) time.Duration,
        lim Limits,
        l log.Logger,
        m *DispatcherMetrics,
    ) *Dispatcher {
        if lim == nil {
            lim = nilLimits{}
        }
    
        disp := &Dispatcher{
            alerts:  ap,
            stage:   s,
            route:   r,
            marker:  mk,
            timeout: to,
            logger:  log.With(l, "component", "dispatcher"),
            metrics: m,
            limits:  lim,
        }
        return disp
    }
    
    func (d *Dispatcher) Run() {
        ...
        d.run(d.alerts.Subscribe())
        ...
    }
    
    func (d *Dispatcher) run(it provider.AlertIterator) {
        ...
        for {
            select {
            case alert, ok := <-it.Next():
                ...
                for _, r := range d.route.Match(alert.Labels) {
                    d.processAlert(alert, r)
                }
                ...
            }
        }
    }
    
    func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
        ...
    
        go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
            _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
        }
        ...
    }
    

    api/api.go中

    func New(opts Options) (*API, error) {
        ...
        v1 := apiv1.New(
            opts.Alerts,
            opts.Silences,
            opts.StatusFunc,
            opts.Peer,
            log.With(l, "version", "v1"),
            opts.Registry,
        )
        ...
    }
    
    

    api/v1/api.go中

    func New(
        alerts provider.Alerts,
        silences *silence.Silences,
        sf getAlertStatusFn,
        peer cluster.ClusterPeer,
        l log.Logger,
        r prometheus.Registerer,
    ) *API {
        if l == nil {
            l = log.NewNopLogger()
        }
    
        return &API{
            alerts:         alerts,
            silences:       silences,
            getAlertStatus: sf,
            uptime:         time.Now(),
            peer:           peer,
            logger:         l,
            m:              metrics.NewAlerts("v1", r),
        }
    }
    
    func (api *API) Register(r *route.Router) {
        wrap := func(f http.HandlerFunc) http.HandlerFunc {
            return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                setCORS(w)
                f(w, r)
            })
        }
    
        r.Options("/*path", wrap(func(w http.ResponseWriter, r *http.Request) {}))
    
        r.Get("/status", wrap(api.status))
        r.Get("/receivers", wrap(api.receivers))
    
        r.Get("/alerts", wrap(api.listAlerts))
        r.Post("/alerts", wrap(api.addAlerts))
    
        r.Get("/silences", wrap(api.listSilences))
        r.Post("/silences", wrap(api.setSilence))
        r.Get("/silence/:sid", wrap(api.getSilence))
        r.Del("/silence/:sid", wrap(api.delSilence))
    }
    
    func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) {
        var alerts []*types.Alert
        if err := api.receive(r, &alerts); err != nil {
            api.respondError(w, apiError{
                typ: errorBadData,
                err: err,
            }, nil)
            return
        }
    
        api.insertAlerts(w, r, alerts...)
    }
    
    func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) {
        ...
        if err := api.alerts.Put(validAlerts...); err != nil {
            api.respondError(w, apiError{
                typ: errorInternal,
                err: err,
            }, nil)
            return
        }
        ...
    }
    
    

    provider/mem/mem.go中

    func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger) (*Alerts, error) {
        ...
        a := &Alerts{
            alerts:    store.NewAlerts(),
            cancel:    cancel,
            listeners: map[int]listeningAlerts{},
            next:      0,
            logger:    log.With(l, "component", "provider"),
            callback:  alertCallback,
        }
        ...
        go a.alerts.Run(ctx, intervalGC)
        ...
    }
    
    func (a *Alerts) Subscribe() provider.AlertIterator {
        a.mtx.Lock()
        defer a.mtx.Unlock()
    
        var (
            done   = make(chan struct{})
            alerts = a.alerts.List()
            ch     = make(chan *types.Alert, max(len(alerts), alertChannelLength))
        )
    
        for _, a := range alerts {
            ch <- a
        }
    
        a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
        a.next++
    
        return provider.NewAlertIterator(ch, done, nil)
    }
    
    func (a *Alerts) Put(alerts ...*types.Alert) error {
        ...
        for _, l := range a.listeners {
                select {
                case l.alerts <- alert:
                case <-l.done:
                }
        }
        ...
    }
    

    store/store.go中

    func NewAlerts() *Alerts {
        a := &Alerts{
            c:  make(map[model.Fingerprint]*types.Alert),
            cb: func(_ []*types.Alert) {},
        }
    
        return a
    }
    func (a *Alerts) Run(ctx context.Context, interval time.Duration) {
        t := time.NewTicker(interval)
        defer t.Stop()
        for {
            select {
            case <-ctx.Done():
                return
            case <-t.C:
                a.gc()
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:prometheus告警相关代码

          本文链接:https://www.haomeiwen.com/subject/niauzltx.html