美文网首页
聊聊promtail的Client

聊聊promtail的Client

作者: go4it | 来源:发表于2021-01-21 00:01 被阅读0次

    本文主要研究一下promtail的Client

    Client

    loki/pkg/promtail/client/client.go

    // Client pushes entries to Loki and can be stopped
    type Client interface {
        api.EntryHandler
        // Stop goroutine sending batch of entries.
        Stop()
    }
    

    Client接口内嵌了api.EntryHandler接口,定义了Stop方法

    EntryHandler

    loki/pkg/promtail/api/types.go

    // EntryHandler is something that can "handle" entries.
    type EntryHandler interface {
        Handle(labels model.LabelSet, time time.Time, entry string) error
    }
    

    EntryHandler接口定义了Handle方法

    client

    loki/pkg/promtail/client/client.go

    // Client for pushing logs in snappy-compressed protos over HTTP.
    type client struct {
        logger  log.Logger
        cfg     Config
        client  *http.Client
        quit    chan struct{}
        once    sync.Once
        entries chan entry
        wg      sync.WaitGroup
    
        externalLabels model.LabelSet
    }
    
    // Handle implement EntryHandler; adds a new line to the next batch; send is async.
    func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
        if len(c.externalLabels) > 0 {
            ls = c.externalLabels.Merge(ls)
        }
    
        // Get the tenant  ID in case it has been overridden while processing
        // the pipeline stages, then remove the special label
        tenantID := c.getTenantID(ls)
        if _, ok := ls[ReservedLabelTenantID]; ok {
            // Clone the label set to not manipulate the input one
            ls = ls.Clone()
            delete(ls, ReservedLabelTenantID)
        }
    
        c.entries <- entry{tenantID, ls, logproto.Entry{
            Timestamp: t,
            Line:      s,
        }}
        return nil
    }
    
    // Stop the client.
    func (c *client) Stop() {
        c.once.Do(func() { close(c.quit) })
        c.wg.Wait()
    }
    

    client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法判断LabelSet是否包含ReservedLabelTenantID,如果包含则会执行ls.Clone()及然后移除,之后构造entry发送到c.entries这个channel;Stop方法执行close(c.quit)

    run

    loki/pkg/promtail/client/client.go

    func (c *client) run() {
        batches := map[string]*batch{}
    
        // Given the client handles multiple batches (1 per tenant) and each batch
        // can be created at a different point in time, we look for batches whose
        // max wait time has been reached every 10 times per BatchWait, so that the
        // maximum delay we have sending batches is 10% of the max waiting time.
        // We apply a cap of 10ms to the ticker, to avoid too frequent checks in
        // case the BatchWait is very low.
        minWaitCheckFrequency := 10 * time.Millisecond
        maxWaitCheckFrequency := c.cfg.BatchWait / 10
        if maxWaitCheckFrequency < minWaitCheckFrequency {
            maxWaitCheckFrequency = minWaitCheckFrequency
        }
    
        maxWaitCheck := time.NewTicker(maxWaitCheckFrequency)
    
        defer func() {
            // Send all pending batches
            for tenantID, batch := range batches {
                c.sendBatch(tenantID, batch)
            }
    
            c.wg.Done()
        }()
    
        for {
            select {
            case <-c.quit:
                return
    
            case e := <-c.entries:
                batch, ok := batches[e.tenantID]
    
                // If the batch doesn't exist yet, we create a new one with the entry
                if !ok {
                    batches[e.tenantID] = newBatch(e)
                    break
                }
    
                // If adding the entry to the batch will increase the size over the max
                // size allowed, we do send the current batch and then create a new one
                if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
                    c.sendBatch(e.tenantID, batch)
    
                    batches[e.tenantID] = newBatch(e)
                    break
                }
    
                // The max size of the batch isn't reached, so we can add the entry
                batch.add(e)
    
            case <-maxWaitCheck.C:
                // Send all batches whose max wait time has been reached
                for tenantID, batch := range batches {
                    if batch.age() < c.cfg.BatchWait {
                        continue
                    }
    
                    c.sendBatch(tenantID, batch)
                    delete(batches, tenantID)
                }
            }
        }
    }
    

    client的run方法创建time.NewTicker(maxWaitCheckFrequency),然后for循环,如果是c.entries读取到了数据就执行batch.add(e),如果是maxWaitCheck触发了则遍历batches,执行c.sendBatch(tenantID, batch)及delete;最后quit的时候,还有defer方法遍历batches执行c.sendBatch(tenantID, batch)

    sendBatch

    loki/pkg/promtail/client/client.go

    func (c *client) sendBatch(tenantID string, batch *batch) {
        buf, entriesCount, err := batch.encode()
        if err != nil {
            level.Error(c.logger).Log("msg", "error encoding batch", "error", err)
            return
        }
        bufBytes := float64(len(buf))
        encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
    
        ctx := context.Background()
        backoff := util.NewBackoff(ctx, c.cfg.BackoffConfig)
        var status int
        for backoff.Ongoing() {
            start := time.Now()
            status, err = c.send(ctx, tenantID, buf)
            requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds())
    
            if err == nil {
                sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
                sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
                for _, s := range batch.streams {
                    lbls, err := parser.ParseMetric(s.Labels)
                    if err != nil {
                        // is this possible?
                        level.Warn(c.logger).Log("msg", "error converting stream label string to label.Labels, cannot update lagging metric", "error", err)
                        return
                    }
                    var lblSet model.LabelSet
                    for i := range lbls {
                        if lbls[i].Name == LatencyLabel {
                            lblSet = model.LabelSet{
                                model.LabelName(HostLabel):    model.LabelValue(c.cfg.URL.Host),
                                model.LabelName(LatencyLabel): model.LabelValue(lbls[i].Value),
                            }
                        }
                    }
                    if lblSet != nil {
                        streamLag.With(lblSet).Set(time.Since(s.Entries[len(s.Entries)-1].Timestamp).Seconds())
                    }
                }
                return
            }
    
            // Only retry 429s, 500s and connection-level errors.
            if status > 0 && status != 429 && status/100 != 5 {
                break
            }
    
            level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "error", err)
            batchRetries.WithLabelValues(c.cfg.URL.Host).Inc()
            backoff.Wait()
        }
    
        if err != nil {
            level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "error", err)
            droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes)
            droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount))
        }
    }
    

    sendBatch方法先通过batch.encode()编码为buf,然后通过c.send(ctx, tenantID, buf)进行发送

    send

    loki/pkg/promtail/client/client.go

    func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
        ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout)
        defer cancel()
        req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
        if err != nil {
            return -1, err
        }
        req = req.WithContext(ctx)
        req.Header.Set("Content-Type", contentType)
        req.Header.Set("User-Agent", UserAgent)
    
        // If the tenant ID is not empty promtail is running in multi-tenant mode, so
        // we should send it to Loki
        if tenantID != "" {
            req.Header.Set("X-Scope-OrgID", tenantID)
        }
    
        resp, err := c.client.Do(req)
        if err != nil {
            return -1, err
        }
        defer helpers.LogError("closing response body", resp.Body.Close)
    
        if resp.StatusCode/100 != 2 {
            scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen))
            line := ""
            if scanner.Scan() {
                line = scanner.Text()
            }
            err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line)
        }
        return resp.StatusCode, err
    }
    

    send方法执行一个POST的http请求发送到c.cfg.URL.String()

    小结

    promtail的client定义了logger、cfg、client、quit、once、entries、wg、externalLabels属性;它实现了Client接口的Handle、Stop方法;Handle方法构造entry发送到c.entries这个channel;Stop方法执行close(c.quit);然后它还有一个run方法将entry添加到batch,然后将batch通过http的POST请求发送到指定的地址。

    doc

    相关文章

      网友评论

          本文标题:聊聊promtail的Client

          本文链接:https://www.haomeiwen.com/subject/rhzlzktx.html