美文网首页
聊聊cortex的ReadRing

聊聊cortex的ReadRing

作者: go4it | 来源:发表于2021-01-26 23:53 被阅读0次

    本文主要研究一下cortex的ReadRing

    ReadRing

    cortex/pkg/ring/ring.go

    // ReadRing represents the read interface to the ring.
    type ReadRing interface {
        prometheus.Collector
    
        // Get returns n (or more) ingesters which form the replicas for the given key.
        // bufDescs, bufHosts and bufZones are slices to be overwritten for the return value
        // to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
        Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error)
    
        // GetAllHealthy returns all healthy instances in the ring, for the given operation.
        // This function doesn't check if the quorum is honored, so doesn't fail if the number
        // of unhealthy ingesters is greater than the tolerated max unavailable.
        GetAllHealthy(op Operation) (ReplicationSet, error)
    
        // GetReplicationSetForOperation returns all instances where the input operation should be executed.
        // The resulting ReplicationSet doesn't necessarily contains all healthy instances
        // in the ring, but could contain the minimum set of instances required to execute
        // the input operation.
        GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
    
        ReplicationFactor() int
        IngesterCount() int
    
        // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
        // and size (number of instances).
        ShuffleShard(identifier string, size int) ReadRing
    
        // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes
        // all instances that have been part of the identifier's shard since "now - lookbackPeriod".
        ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing
    
        // HasInstance returns whether the ring contains an instance matching the provided instanceID.
        HasInstance(instanceID string) bool
    }
    

    ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法

    Get

    cortex/pkg/ring/ring.go

    // Get returns n (or more) ingesters which form the replicas for the given key.
    func (r *Ring) Get(key uint32, op Operation, bufDescs []IngesterDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
        r.mtx.RLock()
        defer r.mtx.RUnlock()
        if r.ringDesc == nil || len(r.ringTokens) == 0 {
            return ReplicationSet{}, ErrEmptyRing
        }
    
        var (
            n          = r.cfg.ReplicationFactor
            ingesters  = bufDescs[:0]
            start      = searchToken(r.ringTokens, key)
            iterations = 0
    
            // We use a slice instead of a map because it's faster to search within a
            // slice than lookup a map for a very low number of items.
            distinctHosts = bufHosts[:0]
            distinctZones = bufZones[:0]
        )
        for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
            iterations++
            // Wrap i around in the ring.
            i %= len(r.ringTokens)
            token := r.ringTokens[i]
    
            info, ok := r.ringInstanceByToken[token]
            if !ok {
                // This should never happen unless a bug in the ring code.
                return ReplicationSet{}, ErrInconsistentTokensInfo
            }
    
            // We want n *distinct* ingesters && distinct zones.
            if util.StringsContain(distinctHosts, info.InstanceID) {
                continue
            }
    
            // Ignore if the ingesters don't have a zone set.
            if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
                if util.StringsContain(distinctZones, info.Zone) {
                    continue
                }
                distinctZones = append(distinctZones, info.Zone)
            }
    
            distinctHosts = append(distinctHosts, info.InstanceID)
            ingester := r.ringDesc.Ingesters[info.InstanceID]
    
            // Check whether the replica set should be extended given we're including
            // this instance.
            if op.ShouldExtendReplicaSetOnState(ingester.State) {
                n++
            }
    
            ingesters = append(ingesters, ingester)
        }
    
        liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
        if err != nil {
            return ReplicationSet{}, err
        }
    
        return ReplicationSet{
            Ingesters: liveIngesters,
            MaxErrors: maxFailure,
        }, nil
    }
    

    Get方法先通过r.ringInstanceByToken[token]获取info,再通过r.ringDesc.Ingesters[info.InstanceID]获取ingester,之后通过r.strategy.Filter过滤出liveIngesters

    GetAllHealthy

    cortex/pkg/ring/ring.go

    // GetAllHealthy implements ReadRing.
    func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
        r.mtx.RLock()
        defer r.mtx.RUnlock()
    
        if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 {
            return ReplicationSet{}, ErrEmptyRing
        }
    
        now := time.Now()
        ingesters := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
        for _, ingester := range r.ringDesc.Ingesters {
            if r.IsHealthy(&ingester, op, now) {
                ingesters = append(ingesters, ingester)
            }
        }
    
        return ReplicationSet{
            Ingesters: ingesters,
            MaxErrors: 0,
        }, nil
    }
    

    GetAllHealthy方法遍历r.ringDesc.Ingesters,然后通过r.IsHealthy(&ingester, op, now)提取healthy的ingester

    GetReplicationSetForOperation

    cortex/pkg/ring/ring.go

    // GetReplicationSetForOperation implements ReadRing.
    func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
        r.mtx.RLock()
        defer r.mtx.RUnlock()
    
        if r.ringDesc == nil || len(r.ringTokens) == 0 {
            return ReplicationSet{}, ErrEmptyRing
        }
    
        // Build the initial replication set, excluding unhealthy instances.
        healthyInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
        zoneFailures := make(map[string]struct{})
        now := time.Now()
    
        for _, ingester := range r.ringDesc.Ingesters {
            if r.IsHealthy(&ingester, op, now) {
                healthyInstances = append(healthyInstances, ingester)
            } else {
                zoneFailures[ingester.Zone] = struct{}{}
            }
        }
    
        // Max errors and max unavailable zones are mutually exclusive. We initialise both
        // to 0 and then we update them whether zone-awareness is enabled or not.
        maxErrors := 0
        maxUnavailableZones := 0
    
        if r.cfg.ZoneAwarenessEnabled {
            // Given data is replicated to RF different zones, we can tolerate a number of
            // RF/2 failing zones. However, we need to protect from the case the ring currently
            // contains instances in a number of zones < RF.
            numReplicatedZones := util.Min(len(r.ringZones), r.cfg.ReplicationFactor)
            minSuccessZones := (numReplicatedZones / 2) + 1
            maxUnavailableZones = minSuccessZones - 1
    
            if len(zoneFailures) > maxUnavailableZones {
                return ReplicationSet{}, ErrTooManyFailedIngesters
            }
    
            if len(zoneFailures) > 0 {
                // We remove all instances (even healthy ones) from zones with at least
                // 1 failing ingester. Due to how replication works when zone-awareness is
                // enabled (data is replicated to RF different zones), there's no benefit in
                // querying healthy instances from "failing zones". A zone is considered
                // failed if there is single error.
                filteredInstances := make([]IngesterDesc, 0, len(r.ringDesc.Ingesters))
                for _, ingester := range healthyInstances {
                    if _, ok := zoneFailures[ingester.Zone]; !ok {
                        filteredInstances = append(filteredInstances, ingester)
                    }
                }
    
                healthyInstances = filteredInstances
            }
    
            // Since we removed all instances from zones containing at least 1 failing
            // instance, we have to decrease the max unavailable zones accordingly.
            maxUnavailableZones -= len(zoneFailures)
        } else {
            // Calculate the number of required ingesters;
            // ensure we always require at least RF-1 when RF=3.
            numRequired := len(r.ringDesc.Ingesters)
            if numRequired < r.cfg.ReplicationFactor {
                numRequired = r.cfg.ReplicationFactor
            }
            // We can tolerate this many failures
            numRequired -= r.cfg.ReplicationFactor / 2
    
            if len(healthyInstances) < numRequired {
                return ReplicationSet{}, ErrTooManyFailedIngesters
            }
    
            maxErrors = len(healthyInstances) - numRequired
        }
    
        return ReplicationSet{
            Ingesters:           healthyInstances,
            MaxErrors:           maxErrors,
            MaxUnavailableZones: maxUnavailableZones,
        }, nil
    }
    

    GetReplicationSetForOperation先提取healthyInstances,然后再根据r.cfg.ZoneAwarenessEnabled进行进一步过滤

    ShuffleShard

    cortex/pkg/ring/ring.go

    func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
        // Nothing to do if the shard size is not smaller then the actual ring.
        if size <= 0 || r.IngesterCount() <= size {
            return r
        }
    
        if cached := r.getCachedShuffledSubring(identifier, size); cached != nil {
            return cached
        }
    
        result := r.shuffleShard(identifier, size, 0, time.Now())
    
        r.setCachedShuffledSubring(identifier, size, result)
        return result
    }
    

    ShuffleShard方法先从r.getCachedShuffledSubring获取,如果为nil则执行r.shuffleShard,再执行r.setCachedShuffledSubring

    HasInstance

    cortex/pkg/ring/ring.go

    // HasInstance returns whether the ring contains an instance matching the provided instanceID.
    func (r *Ring) HasInstance(instanceID string) bool {
        r.mtx.RLock()
        defer r.mtx.RUnlock()
    
        instances := r.ringDesc.GetIngesters()
        _, ok := instances[instanceID]
        return ok
    }
    

    HasInstance通过r.ringDesc.GetIngesters()获取instances,在根据instanceID判断是否存在

    小结

    cortex的ReadRing内嵌了prometheus.Collector,定义了Get、GetAllHealthy、GetReplicationSetForOperation、ReplicationFactor、IngesterCount、ShuffleShard、ShuffleShardWithLookback、HasInstance方法。

    doc

    相关文章

      网友评论

          本文标题:聊聊cortex的ReadRing

          本文链接:https://www.haomeiwen.com/subject/mfcozktx.html