美文网首页
k8s 之 hpa controller 源码简单分析

k8s 之 hpa controller 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-21 19:49 被阅读0次

    简介

    hpa controller 根据 HorizontalPodAutoscalerUseRESTClients 根据 metric server 还是 heapster
    根据资源使用率来进行扩容/缩容,调用 apiserver 的接口更新他的子资源(scale)

    创建处

    func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
        store := &genericregistry.Store{
            NewFunc:                  func() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} },
            NewListFunc:              func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} },
            DefaultQualifiedResource: autoscaling.Resource("horizontalpodautoscalers"),
    
            CreateStrategy: horizontalpodautoscaler.Strategy,
            UpdateStrategy: horizontalpodautoscaler.Strategy,
            DeleteStrategy: horizontalpodautoscaler.Strategy,
    
            TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
        }
        options := &generic.StoreOptions{RESTOptions: optsGetter}
        if err := store.CompleteWithOptions(options); err != nil {
            return nil, nil, err
        }
    
        statusStore := *store
        statusStore.UpdateStrategy = horizontalpodautoscaler.StatusStrategy
        return &REST{store}, &StatusREST{store: &statusStore}, nil
    }
    

    controller

    cmd/kube-controller-manager/app/autoscaling.go 中

    
    func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
        if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
            return nil, false, nil
        }
    
        if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients {
            // use the new-style clients if support for custom metrics is enabled
            return startHPAControllerWithRESTClient(ctx)
        }
    
        return startHPAControllerWithLegacyClient(ctx)
    }
    
    func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error) {
        clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
        hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
    
        apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
        // invalidate the discovery information roughly once per resync interval our API
        // information is *at most* two resync intervals old.
        go custom_metrics.PeriodicallyInvalidate(
            apiVersionsGetter,
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
            ctx.Stop)
    
        metricsClient := metrics.NewRESTMetricsClient(
            resourceclient.NewForConfigOrDie(clientConfig),
            custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter),
            external_metrics.NewForConfigOrDie(clientConfig),
        )
        return startHPAControllerWithMetricsClient(ctx, metricsClient)
    }
    
    
    func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
        hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
        hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
    
        // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
        // so we want to re-fetch every time when we actually ask for it
        scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
        scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
        if err != nil {
            return nil, false, err
        }
    
        go podautoscaler.NewHorizontalController(
            hpaClient.CoreV1(),
            scaleClient,
            hpaClient.AutoscalingV1(),
            ctx.RESTMapper,
            metricsClient,
            ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
            ctx.InformerFactory.Core().V1().Pods(),
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
            ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
        ).Run(ctx.Stop)
        return nil, true, nil
    }
    
    
    

    pkg/controller/podautoscaler/horizontal.go 中

    func NewHorizontalController(
        evtNamespacer v1core.EventsGetter,
        scaleNamespacer scaleclient.ScalesGetter,
        hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
        mapper apimeta.RESTMapper,
        metricsClient metricsclient.MetricsClient,
        hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
        podInformer coreinformers.PodInformer,
        resyncPeriod time.Duration,
        downscaleStabilisationWindow time.Duration,
        tolerance float64,
        cpuInitializationPeriod,
        delayOfInitialReadinessStatus time.Duration,
    
    ) *HorizontalController {
        broadcaster := record.NewBroadcaster()
        broadcaster.StartStructuredLogging(0)
        broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
        recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})
    
        hpaController := &HorizontalController{
            eventRecorder:                recorder,
            scaleNamespacer:              scaleNamespacer,
            hpaNamespacer:                hpaNamespacer,
            downscaleStabilisationWindow: downscaleStabilisationWindow,
            queue:                        workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
            mapper:                       mapper,
            recommendations:              map[string][]timestampedRecommendation{},
            scaleUpEvents:                map[string][]timestampedScaleEvent{},
            scaleDownEvents:              map[string][]timestampedScaleEvent{},
        }
    
        hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
            cache.ResourceEventHandlerFuncs{
                AddFunc:    hpaController.enqueueHPA,
                UpdateFunc: hpaController.updateHPA,
                DeleteFunc: hpaController.deleteHPA,
            },
            resyncPeriod,
        )
        hpaController.hpaLister = hpaInformer.Lister()
        hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced
    
        hpaController.podLister = podInformer.Lister()
        hpaController.podListerSynced = podInformer.Informer().HasSynced
    
        replicaCalc := NewReplicaCalculator(
            metricsClient,
            hpaController.podLister,
            tolerance,
            cpuInitializationPeriod,
            delayOfInitialReadinessStatus,
        )
        hpaController.replicaCalc = replicaCalc
    
        return hpaController
    }
    
    func (a *HorizontalController) Run(stopCh <-chan struct{}) {
        defer utilruntime.HandleCrash()
        defer a.queue.ShutDown()
    
        klog.Infof("Starting HPA controller")
        defer klog.Infof("Shutting down HPA controller")
    
        if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
            return
        }
    
        // start a single worker (we may wish to start more in the future)
        go wait.Until(a.worker, time.Second, stopCh)
    
        <-stopCh
    }
    
    
    func (a *HorizontalController) worker() {
        for a.processNextWorkItem() {
        }
        klog.Infof("horizontal pod autoscaler controller worker shutting down")
    }
    
    func (a *HorizontalController) processNextWorkItem() bool {
        key, quit := a.queue.Get()
        if quit {
            return false
        }
        defer a.queue.Done(key)
    
        deleted, err := a.reconcileKey(key.(string))
        if err != nil {
            utilruntime.HandleError(err)
        }
        // Add request processing HPA to queue with resyncPeriod delay.
        // Requests are always added to queue with resyncPeriod delay. If there's already request
        // for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
        // in queue so HPAs are processed every resyncPeriod.
        // Request is added here just in case last resync didn't insert request into the queue. This
        // happens quite often because there is race condition between adding request after resyncPeriod
        // and removing them from queue. Request can be added by resync before previous request is
        // removed from queue. If we didn't add request here then in this case one request would be dropped
        // and HPA would processed after 2 x resyncPeriod.
        if !deleted {
            a.queue.AddRateLimited(key)
        }
    
        return true
    }
    
    
    
    func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
        if err != nil {
            return true, err
        }
    
        hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
        if errors.IsNotFound(err) {
            klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
            delete(a.recommendations, key)
            delete(a.scaleUpEvents, key)
            delete(a.scaleDownEvents, key)
            return true, nil
        }
        if err != nil {
            return false, err
        }
    
        return false, a.reconcileAutoscaler(hpa, key)
    }
    
    
    func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
        // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
        hpav1 := hpav1Shared.DeepCopy()
        // then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
        hpaRaw, err := unsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
        if err != nil {
            a.eventRecorder.Event(hpav1, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
            return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
        }
        hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
        hpaStatusOriginal := hpa.Status.DeepCopy()
    
        reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
    
        targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
        if err != nil {
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
            setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
            a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
            return fmt.Errorf("invalid API version in scale target reference: %v", err)
        }
    
        targetGK := schema.GroupKind{
            Group: targetGV.Group,
            Kind:  hpa.Spec.ScaleTargetRef.Kind,
        }
    
        mappings, err := a.mapper.RESTMappings(targetGK)
        if err != nil {
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
            setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
            a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
            return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
        }
    
        scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
        if err != nil {
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
            setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
            a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
            return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
        }
        setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
        currentReplicas := scale.Spec.Replicas
        a.recordInitialRecommendation(currentReplicas, key)
    
        var (
            metricStatuses        []autoscalingv2.MetricStatus
            metricDesiredReplicas int32
            metricName            string
        )
    
        desiredReplicas := int32(0)
        rescaleReason := ""
    
        var minReplicas int32
    
        if hpa.Spec.MinReplicas != nil {
            minReplicas = *hpa.Spec.MinReplicas
        } else {
            // Default value
            minReplicas = 1
        }
    
        rescale := true
    
        if scale.Spec.Replicas == 0 && minReplicas != 0 {
            // Autoscaling is disabled for this resource
            desiredReplicas = 0
            rescale = false
            setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
        } else if currentReplicas > hpa.Spec.MaxReplicas {
            rescaleReason = "Current number of replicas above Spec.MaxReplicas"
            desiredReplicas = hpa.Spec.MaxReplicas
        } else if currentReplicas < minReplicas {
            rescaleReason = "Current number of replicas below Spec.MinReplicas"
            desiredReplicas = minReplicas
        } else {
            var metricTimestamp time.Time
            metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
            if err != nil {
                a.setCurrentReplicasInStatus(hpa, currentReplicas)
                if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
                    utilruntime.HandleError(err)
                }
                a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
                return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
            }
    
            klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)
    
            rescaleMetric := ""
            if metricDesiredReplicas > desiredReplicas {
                desiredReplicas = metricDesiredReplicas
                rescaleMetric = metricName
            }
            if desiredReplicas > currentReplicas {
                rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
            }
            if desiredReplicas < currentReplicas {
                rescaleReason = "All metrics below target"
            }
            if hpa.Spec.Behavior == nil {
                desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
            } else {
                desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
            }
            rescale = desiredReplicas != currentReplicas
        }
    
        if rescale {
            scale.Spec.Replicas = desiredReplicas
            _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(context.TODO(), targetGR, scale, metav1.UpdateOptions{})
            if err != nil {
                a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
                setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
                a.setCurrentReplicasInStatus(hpa, currentReplicas)
                if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
                    utilruntime.HandleError(err)
                }
                return fmt.Errorf("failed to rescale %s: %v", reference, err)
            }
            setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
            a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
            a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
            klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
                hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
        } else {
            klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
            desiredReplicas = currentReplicas
        }
    
        a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
        return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
    }
    
    func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
        metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {
    
        if scale.Status.Selector == "" {
            errMsg := "selector is required"
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
            setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
            return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
        }
    
        selector, err := labels.Parse(scale.Status.Selector)
        if err != nil {
            errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
            setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
            return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
        }
    
        specReplicas := scale.Spec.Replicas
        statusReplicas := scale.Status.Replicas
        statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))
    
        invalidMetricsCount := 0
        var invalidMetricError error
        var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition
    
        for i, metricSpec := range metricSpecs {
            replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])
    
            if err != nil {
                if invalidMetricsCount <= 0 {
                    invalidMetricCondition = condition
                    invalidMetricError = err
                }
                invalidMetricsCount++
            }
            if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
                timestamp = timestampProposal
                replicas = replicaCountProposal
                metric = metricNameProposal
            }
        }
    
        // If all metrics are invalid return error and set condition on hpa based on first invalid metric.
        if invalidMetricsCount >= len(metricSpecs) {
            setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
            return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
        }
        setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
        return replicas, metric, statuses, timestamp, nil
    }
    
    func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
        specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
        timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
    
        switch spec.Type {
        case autoscalingv2.ObjectMetricSourceType:
            metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
            if err != nil {
                condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
                return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
            }
            replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
            if err != nil {
                return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
            }
        case autoscalingv2.PodsMetricSourceType:
            metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
            if err != nil {
                condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
                return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
            }
            replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
            if err != nil {
                return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
            }
        case autoscalingv2.ResourceMetricSourceType:
            replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(specReplicas, spec, hpa, selector, status)
            if err != nil {
                return 0, "", time.Time{}, condition, err
            }
        case autoscalingv2.ExternalMetricSourceType:
            replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
            if err != nil {
                return 0, "", time.Time{}, condition, err
            }
        default:
            errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
            err = fmt.Errorf(errMsg)
            condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
            return 0, "", time.Time{}, condition, err
        }
        return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
    }
    
    func (a *HorizontalController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
        if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType {
            replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, selector, metricSelector)
            if err != nil {
                condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
                return 0, timestampProposal, "", condition, err
            }
            *status = autoscalingv2.MetricStatus{
                Type: autoscalingv2.ObjectMetricSourceType,
                Object: &autoscalingv2.ObjectMetricStatus{
                    DescribedObject: metricSpec.Object.DescribedObject,
                    Metric: autoscalingv2.MetricIdentifier{
                        Name:     metricSpec.Object.Metric.Name,
                        Selector: metricSpec.Object.Metric.Selector,
                    },
                    Current: autoscalingv2.MetricValueStatus{
                        Value: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
                    },
                },
            }
            return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
        } else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType {
            replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector)
            if err != nil {
                condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
                return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
            }
            *status = autoscalingv2.MetricStatus{
                Type: autoscalingv2.ObjectMetricSourceType,
                Object: &autoscalingv2.ObjectMetricStatus{
                    Metric: autoscalingv2.MetricIdentifier{
                        Name:     metricSpec.Object.Metric.Name,
                        Selector: metricSpec.Object.Metric.Selector,
                    },
                    Current: autoscalingv2.MetricValueStatus{
                        AverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
                    },
                },
            }
            return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
        }
        errMsg := "invalid object metric source: neither a value target nor an average value target was set"
        err = fmt.Errorf(errMsg)
        condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
        return 0, time.Time{}, "", condition, err
    }
    
    

    pkg/controller/podautoscaler/horizontal.go 中

    func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
        utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
        if err != nil {
            return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
        }
    
        usageRatio := float64(utilization) / float64(targetUtilization)
        replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, selector)
        return replicaCount, utilization, timestamp, err
    }
    

    相关文章

      网友评论

          本文标题:k8s 之 hpa controller 源码简单分析

          本文链接:https://www.haomeiwen.com/subject/gumwkktx.html