美文网首页
聊聊cortex的Distributor

聊聊cortex的Distributor

作者: go4it | 来源:发表于2021-01-25 23:51 被阅读0次

    本文主要研究一下cortex的Distributor

    Distributor

    cortex/pkg/distributor/distributor.go

    // Distributor is a storage.SampleAppender and a client.Querier which
    // forwards appends and queries to individual ingesters.
    type Distributor struct {
        services.Service
    
        cfg           Config
        ingestersRing ring.ReadRing
        ingesterPool  *ring_client.Pool
        limits        *validation.Overrides
    
        // The global rate limiter requires a distributors ring to count
        // the number of healthy instances
        distributorsRing *ring.Lifecycler
    
        // For handling HA replicas.
        HATracker *haTracker
    
        // Per-user rate limiter.
        ingestionRateLimiter *limiter.RateLimiter
    
        // Manager for subservices (HA Tracker, distributor ring and client pool)
        subservices        *services.Manager
        subservicesWatcher *services.FailureWatcher
    }
    

    Distributor用于转发、追加、查询ingesters

    Push

    cortex/pkg/distributor/distributor.go

    // Push implements client.IngesterServer
    func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
        userID, err := tenant.TenantID(ctx)
        if err != nil {
            return nil, err
        }
        source := util.GetSourceIPsFromOutgoingCtx(ctx)
    
        var firstPartialErr error
        removeReplica := false
    
        numSamples := 0
        for _, ts := range req.Timeseries {
            numSamples += len(ts.Samples)
        }
        // Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
        incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
        // Count the total number of metadata in.
        incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
    
        // A WriteRequest can only contain series or metadata but not both. This might change in the future.
        // For each timeseries or samples, we compute a hash to distribute across ingesters;
        // check each sample/metadata and discard if outside limits.
        validatedTimeseries := make([]client.PreallocTimeseries, 0, len(req.Timeseries))
        validatedMetadata := make([]*client.MetricMetadata, 0, len(req.Metadata))
        metadataKeys := make([]uint32, 0, len(req.Metadata))
        seriesKeys := make([]uint32, 0, len(req.Timeseries))
        validatedSamples := 0
    
        if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
            cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
            removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
            if err != nil {
                // Ensure the request slice is reused if the series get deduped.
                client.ReuseSlice(req.Timeseries)
    
                if errors.Is(err, replicasNotMatchError{}) {
                    // These samples have been deduped.
                    dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
                    return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
                }
    
                if errors.Is(err, tooManyClustersError{}) {
                    validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
                    return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
                }
    
                return nil, err
            }
            // If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
            if !removeReplica {
                nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
            }
        }
    
        latestSampleTimestampMs := int64(0)
        defer func() {
            // Update this metric even in case of errors.
            if latestSampleTimestampMs > 0 {
                latestSeenSampleTimestampPerUser.WithLabelValues(userID).Set(float64(latestSampleTimestampMs) / 1000)
            }
        }()
    
        // For each timeseries, compute a hash to distribute across ingesters;
        // check each sample and discard if outside limits.
        for _, ts := range req.Timeseries {
            // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
            if len(ts.Samples) > 0 {
                latestSampleTimestampMs = util.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
            }
    
            if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
                l := relabel.Process(client.FromLabelAdaptersToLabels(ts.Labels), mrc...)
                ts.Labels = client.FromLabelsToLabelAdapters(l)
            }
    
            // If we found both the cluster and replica labels, we only want to include the cluster label when
            // storing series in Cortex. If we kept the replica label we would end up with another series for the same
            // series we're trying to dedupe when HA tracking moves over to a different replica.
            if removeReplica {
                removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
            }
    
            for _, labelName := range d.limits.DropLabels(userID) {
                removeLabel(labelName, &ts.Labels)
            }
    
            if len(ts.Labels) == 0 {
                continue
            }
    
            // We rely on sorted labels in different places:
            // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns
            // different tokens, which is bad.
            // 2) In validation code, when checking for duplicate label names. As duplicate label names are rejected
            // later in the validation phase, we ignore them here.
            sortLabelsIfNeeded(ts.Labels)
    
            // Generate the sharding token based on the series labels without the HA replica
            // label and dropped labels (if any)
            key, err := d.tokenForLabels(userID, ts.Labels)
            if err != nil {
                return nil, err
            }
    
            validatedSeries, err := d.validateSeries(ts, userID)
    
            // Errors in validation are considered non-fatal, as one series in a request may contain
            // invalid data but all the remaining series could be perfectly valid.
            if err != nil && firstPartialErr == nil {
                firstPartialErr = err
            }
    
            // validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
            if validatedSeries == emptyPreallocSeries {
                continue
            }
    
            seriesKeys = append(seriesKeys, key)
            validatedTimeseries = append(validatedTimeseries, validatedSeries)
            validatedSamples += len(ts.Samples)
        }
    
        for _, m := range req.Metadata {
            err := validation.ValidateMetadata(d.limits, userID, m)
    
            if err != nil {
                if firstPartialErr == nil {
                    firstPartialErr = err
                }
    
                continue
            }
    
            metadataKeys = append(metadataKeys, d.tokenForMetadata(userID, m.MetricFamilyName))
            validatedMetadata = append(validatedMetadata, m)
        }
    
        receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
        receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
    
        if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
            // Ensure the request slice is reused if there's no series or metadata passing the validation.
            client.ReuseSlice(req.Timeseries)
    
            return &client.WriteResponse{}, firstPartialErr
        }
    
        now := time.Now()
        totalN := validatedSamples + len(validatedMetadata)
        if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
            // Ensure the request slice is reused if the request is rate limited.
            client.ReuseSlice(req.Timeseries)
    
            // Return a 4xx here to have the client discard the data and not retry. If a client
            // is sending too much data consistently we will unlikely ever catch up otherwise.
            validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
            validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
            return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
        }
    
        subRing := d.ingestersRing
    
        // Obtain a subring if required.
        if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
            subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
        }
    
        keys := append(seriesKeys, metadataKeys...)
        initialMetadataIndex := len(seriesKeys)
    
        op := ring.WriteNoExtend
        if d.cfg.ExtendWrites {
            op = ring.Write
        }
    
        err = ring.DoBatch(ctx, op, subRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
            timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
            var metadata []*client.MetricMetadata
    
            for _, i := range indexes {
                if i >= initialMetadataIndex {
                    metadata = append(metadata, validatedMetadata[i-initialMetadataIndex])
                } else {
                    timeseries = append(timeseries, validatedTimeseries[i])
                }
            }
    
            // Use a background context to make sure all ingesters get samples even if we return early
            localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
            defer cancel()
            localCtx = user.InjectOrgID(localCtx, userID)
            if sp := opentracing.SpanFromContext(ctx); sp != nil {
                localCtx = opentracing.ContextWithSpan(localCtx, sp)
            }
    
            // Get clientIP(s) from Context and add it to localCtx
            localCtx = util.AddSourceIPsToOutgoingContext(localCtx, source)
    
            return d.send(localCtx, ingester, timeseries, metadata, req.Source)
        }, func() { client.ReuseSlice(req.Timeseries) })
        if err != nil {
            return nil, err
        }
        return &client.WriteResponse{}, firstPartialErr
    }
    

    Push方法在d.cfg.ShardingStrategy为util.ShardingStrategyShuffle时,会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys,其callback函数执行d.send(localCtx, ingester, timeseries, metadata, req.Source)

    DoBatch

    cortex/pkg/ring/batch.go

    // DoBatch request against a set of keys in the ring, handling replication and
    // failures. For example if we want to write N items where they may all
    // hit different ingesters, and we want them all replicated R ways with
    // quorum writes, we track the relationship between batch RPCs and the items
    // within them.
    //
    // Callback is passed the ingester to target, and the indexes of the keys
    // to send to that ingester.
    //
    // Not implemented as a method on Ring so we can test separately.
    func DoBatch(ctx context.Context, op Operation, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
        if r.IngesterCount() <= 0 {
            return fmt.Errorf("DoBatch: IngesterCount <= 0")
        }
        expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()
        itemTrackers := make([]itemTracker, len(keys))
        ingesters := make(map[string]ingester, r.IngesterCount())
    
        var (
            bufDescs [GetBufferSize]IngesterDesc
            bufHosts [GetBufferSize]string
            bufZones [GetBufferSize]string
        )
        for i, key := range keys {
            replicationSet, err := r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
            if err != nil {
                return err
            }
            itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
            itemTrackers[i].maxFailures = replicationSet.MaxErrors
    
            for _, desc := range replicationSet.Ingesters {
                curr, found := ingesters[desc.Addr]
                if !found {
                    curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers)
                    curr.indexes = make([]int, 0, expectedTrackers)
                }
                ingesters[desc.Addr] = ingester{
                    desc:         desc,
                    itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),
                    indexes:      append(curr.indexes, i),
                }
            }
        }
    
        tracker := batchTracker{
            done: make(chan struct{}, 1),
            err:  make(chan error, 1),
        }
        tracker.rpcsPending.Store(int32(len(itemTrackers)))
    
        var wg sync.WaitGroup
    
        wg.Add(len(ingesters))
        for _, i := range ingesters {
            go func(i ingester) {
                err := callback(i.desc, i.indexes)
                tracker.record(i.itemTrackers, err)
                wg.Done()
            }(i)
        }
    
        // Perform cleanup at the end.
        go func() {
            wg.Wait()
    
            cleanup()
        }()
    
        select {
        case err := <-tracker.err:
            return err
        case <-tracker.done:
            return nil
        case <-ctx.Done():
            return ctx.Err()
        }
    }
    

    DoBatch方法提供了callback函数用于处理ingester及indexes

    Query

    cortex/pkg/distributor/query.go

    // Query multiple ingesters and returns a Matrix of samples.
    func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
        var matrix model.Matrix
        err := instrument.CollectedRequest(ctx, "Distributor.Query", queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
            req, err := ingester_client.ToQueryRequest(from, to, matchers)
            if err != nil {
                return err
            }
    
            replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
            if err != nil {
                return err
            }
    
            matrix, err = d.queryIngesters(ctx, replicationSet, req)
            if err != nil {
                return err
            }
    
            if s := opentracing.SpanFromContext(ctx); s != nil {
                s.LogKV("series", len(matrix))
            }
            return nil
        })
        return matrix, err
    }
    

    Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix

    小结

    cortex的Distributor提供了Push、Query方法;Push方法会通过d.ingestersRing.ShuffleShard确定subRing;之后通过ring.DoBatch提交keys;Query方法通过d.GetIngestersForQuery获取replicationSet,再通过d.queryIngesters获取matrix。

    doc

    相关文章

      网友评论

          本文标题:聊聊cortex的Distributor

          本文链接:https://www.haomeiwen.com/subject/wbywzktx.html