美文网首页
k8s 之 hpa controller 源码简单分析

k8s 之 hpa controller 源码简单分析

作者: wwq2020 | 来源:发表于2020-07-21 19:49 被阅读0次

简介

hpa controller 根据 HorizontalPodAutoscalerUseRESTClients 根据 metric server 还是 heapster
根据资源使用率来进行扩容/缩容,调用 apiserver 的接口更新他的子资源(scale)

创建处

func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, error) {
    store := &genericregistry.Store{
        NewFunc:                  func() runtime.Object { return &autoscaling.HorizontalPodAutoscaler{} },
        NewListFunc:              func() runtime.Object { return &autoscaling.HorizontalPodAutoscalerList{} },
        DefaultQualifiedResource: autoscaling.Resource("horizontalpodautoscalers"),

        CreateStrategy: horizontalpodautoscaler.Strategy,
        UpdateStrategy: horizontalpodautoscaler.Strategy,
        DeleteStrategy: horizontalpodautoscaler.Strategy,

        TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
    }
    options := &generic.StoreOptions{RESTOptions: optsGetter}
    if err := store.CompleteWithOptions(options); err != nil {
        return nil, nil, err
    }

    statusStore := *store
    statusStore.UpdateStrategy = horizontalpodautoscaler.StatusStrategy
    return &REST{store}, &StatusREST{store: &statusStore}, nil
}

controller

cmd/kube-controller-manager/app/autoscaling.go 中


func startHPAController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "autoscaling", Version: "v1", Resource: "horizontalpodautoscalers"}] {
        return nil, false, nil
    }

    if ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerUseRESTClients {
        // use the new-style clients if support for custom metrics is enabled
        return startHPAControllerWithRESTClient(ctx)
    }

    return startHPAControllerWithLegacyClient(ctx)
}

func startHPAControllerWithRESTClient(ctx ControllerContext) (http.Handler, bool, error) {
    clientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")
    hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")

    apiVersionsGetter := custom_metrics.NewAvailableAPIsGetter(hpaClient.Discovery())
    // invalidate the discovery information roughly once per resync interval our API
    // information is *at most* two resync intervals old.
    go custom_metrics.PeriodicallyInvalidate(
        apiVersionsGetter,
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
        ctx.Stop)

    metricsClient := metrics.NewRESTMetricsClient(
        resourceclient.NewForConfigOrDie(clientConfig),
        custom_metrics.NewForConfig(clientConfig, ctx.RESTMapper, apiVersionsGetter),
        external_metrics.NewForConfigOrDie(clientConfig),
    )
    return startHPAControllerWithMetricsClient(ctx, metricsClient)
}


func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) {
    hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler")
    hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler")

    // we don't use cached discovery because DiscoveryScaleKindResolver does its own caching,
    // so we want to re-fetch every time when we actually ask for it
    scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery())
    scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver)
    if err != nil {
        return nil, false, err
    }

    go podautoscaler.NewHorizontalController(
        hpaClient.CoreV1(),
        scaleClient,
        hpaClient.AutoscalingV1(),
        ctx.RESTMapper,
        metricsClient,
        ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration,
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration,
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance,
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration,
        ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration,
    ).Run(ctx.Stop)
    return nil, true, nil
}


pkg/controller/podautoscaler/horizontal.go 中

func NewHorizontalController(
    evtNamespacer v1core.EventsGetter,
    scaleNamespacer scaleclient.ScalesGetter,
    hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter,
    mapper apimeta.RESTMapper,
    metricsClient metricsclient.MetricsClient,
    hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer,
    podInformer coreinformers.PodInformer,
    resyncPeriod time.Duration,
    downscaleStabilisationWindow time.Duration,
    tolerance float64,
    cpuInitializationPeriod,
    delayOfInitialReadinessStatus time.Duration,

) *HorizontalController {
    broadcaster := record.NewBroadcaster()
    broadcaster.StartStructuredLogging(0)
    broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: evtNamespacer.Events("")})
    recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"})

    hpaController := &HorizontalController{
        eventRecorder:                recorder,
        scaleNamespacer:              scaleNamespacer,
        hpaNamespacer:                hpaNamespacer,
        downscaleStabilisationWindow: downscaleStabilisationWindow,
        queue:                        workqueue.NewNamedRateLimitingQueue(NewDefaultHPARateLimiter(resyncPeriod), "horizontalpodautoscaler"),
        mapper:                       mapper,
        recommendations:              map[string][]timestampedRecommendation{},
        scaleUpEvents:                map[string][]timestampedScaleEvent{},
        scaleDownEvents:              map[string][]timestampedScaleEvent{},
    }

    hpaInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    hpaController.enqueueHPA,
            UpdateFunc: hpaController.updateHPA,
            DeleteFunc: hpaController.deleteHPA,
        },
        resyncPeriod,
    )
    hpaController.hpaLister = hpaInformer.Lister()
    hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced

    hpaController.podLister = podInformer.Lister()
    hpaController.podListerSynced = podInformer.Informer().HasSynced

    replicaCalc := NewReplicaCalculator(
        metricsClient,
        hpaController.podLister,
        tolerance,
        cpuInitializationPeriod,
        delayOfInitialReadinessStatus,
    )
    hpaController.replicaCalc = replicaCalc

    return hpaController
}

func (a *HorizontalController) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer a.queue.ShutDown()

    klog.Infof("Starting HPA controller")
    defer klog.Infof("Shutting down HPA controller")

    if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) {
        return
    }

    // start a single worker (we may wish to start more in the future)
    go wait.Until(a.worker, time.Second, stopCh)

    <-stopCh
}


func (a *HorizontalController) worker() {
    for a.processNextWorkItem() {
    }
    klog.Infof("horizontal pod autoscaler controller worker shutting down")
}

func (a *HorizontalController) processNextWorkItem() bool {
    key, quit := a.queue.Get()
    if quit {
        return false
    }
    defer a.queue.Done(key)

    deleted, err := a.reconcileKey(key.(string))
    if err != nil {
        utilruntime.HandleError(err)
    }
    // Add request processing HPA to queue with resyncPeriod delay.
    // Requests are always added to queue with resyncPeriod delay. If there's already request
    // for the HPA in the queue then a new request is always dropped. Requests spend resyncPeriod
    // in queue so HPAs are processed every resyncPeriod.
    // Request is added here just in case last resync didn't insert request into the queue. This
    // happens quite often because there is race condition between adding request after resyncPeriod
    // and removing them from queue. Request can be added by resync before previous request is
    // removed from queue. If we didn't add request here then in this case one request would be dropped
    // and HPA would processed after 2 x resyncPeriod.
    if !deleted {
        a.queue.AddRateLimited(key)
    }

    return true
}



func (a *HorizontalController) reconcileKey(key string) (deleted bool, err error) {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        return true, err
    }

    hpa, err := a.hpaLister.HorizontalPodAutoscalers(namespace).Get(name)
    if errors.IsNotFound(err) {
        klog.Infof("Horizontal Pod Autoscaler %s has been deleted in %s", name, namespace)
        delete(a.recommendations, key)
        delete(a.scaleUpEvents, key)
        delete(a.scaleDownEvents, key)
        return true, nil
    }
    if err != nil {
        return false, err
    }

    return false, a.reconcileAutoscaler(hpa, key)
}


func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error {
    // make a copy so that we never mutate the shared informer cache (conversion can mutate the object)
    hpav1 := hpav1Shared.DeepCopy()
    // then, convert to autoscaling/v2, which makes our lives easier when calculating metrics
    hpaRaw, err := unsafeConvertToVersionVia(hpav1, autoscalingv2.SchemeGroupVersion)
    if err != nil {
        a.eventRecorder.Event(hpav1, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
        return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
    }
    hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
    hpaStatusOriginal := hpa.Status.DeepCopy()

    reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)

    targetGV, err := schema.ParseGroupVersion(hpa.Spec.ScaleTargetRef.APIVersion)
    if err != nil {
        a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
        setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
        a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
        return fmt.Errorf("invalid API version in scale target reference: %v", err)
    }

    targetGK := schema.GroupKind{
        Group: targetGV.Group,
        Kind:  hpa.Spec.ScaleTargetRef.Kind,
    }

    mappings, err := a.mapper.RESTMappings(targetGK)
    if err != nil {
        a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
        setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
        a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
        return fmt.Errorf("unable to determine resource for scale target reference: %v", err)
    }

    scale, targetGR, err := a.scaleForResourceMappings(hpa.Namespace, hpa.Spec.ScaleTargetRef.Name, mappings)
    if err != nil {
        a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
        setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
        a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
        return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
    }
    setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
    currentReplicas := scale.Spec.Replicas
    a.recordInitialRecommendation(currentReplicas, key)

    var (
        metricStatuses        []autoscalingv2.MetricStatus
        metricDesiredReplicas int32
        metricName            string
    )

    desiredReplicas := int32(0)
    rescaleReason := ""

    var minReplicas int32

    if hpa.Spec.MinReplicas != nil {
        minReplicas = *hpa.Spec.MinReplicas
    } else {
        // Default value
        minReplicas = 1
    }

    rescale := true

    if scale.Spec.Replicas == 0 && minReplicas != 0 {
        // Autoscaling is disabled for this resource
        desiredReplicas = 0
        rescale = false
        setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero")
    } else if currentReplicas > hpa.Spec.MaxReplicas {
        rescaleReason = "Current number of replicas above Spec.MaxReplicas"
        desiredReplicas = hpa.Spec.MaxReplicas
    } else if currentReplicas < minReplicas {
        rescaleReason = "Current number of replicas below Spec.MinReplicas"
        desiredReplicas = minReplicas
    } else {
        var metricTimestamp time.Time
        metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
        if err != nil {
            a.setCurrentReplicasInStatus(hpa, currentReplicas)
            if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
                utilruntime.HandleError(err)
            }
            a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
            return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
        }

        klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference)

        rescaleMetric := ""
        if metricDesiredReplicas > desiredReplicas {
            desiredReplicas = metricDesiredReplicas
            rescaleMetric = metricName
        }
        if desiredReplicas > currentReplicas {
            rescaleReason = fmt.Sprintf("%s above target", rescaleMetric)
        }
        if desiredReplicas < currentReplicas {
            rescaleReason = "All metrics below target"
        }
        if hpa.Spec.Behavior == nil {
            desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas)
        } else {
            desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas)
        }
        rescale = desiredReplicas != currentReplicas
    }

    if rescale {
        scale.Spec.Replicas = desiredReplicas
        _, err = a.scaleNamespacer.Scales(hpa.Namespace).Update(context.TODO(), targetGR, scale, metav1.UpdateOptions{})
        if err != nil {
            a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
            setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
            a.setCurrentReplicasInStatus(hpa, currentReplicas)
            if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
                utilruntime.HandleError(err)
            }
            return fmt.Errorf("failed to rescale %s: %v", reference, err)
        }
        setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
        a.eventRecorder.Eventf(hpa, v1.EventTypeNormal, "SuccessfulRescale", "New size: %d; reason: %s", desiredReplicas, rescaleReason)
        a.storeScaleEvent(hpa.Spec.Behavior, key, currentReplicas, desiredReplicas)
        klog.Infof("Successful rescale of %s, old size: %d, new size: %d, reason: %s",
            hpa.Name, currentReplicas, desiredReplicas, rescaleReason)
    } else {
        klog.V(4).Infof("decided not to scale %s to %v (last scale time was %s)", reference, desiredReplicas, hpa.Status.LastScaleTime)
        desiredReplicas = currentReplicas
    }

    a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
    return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
}

func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale,
    metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) {

    if scale.Status.Selector == "" {
        errMsg := "selector is required"
        a.eventRecorder.Event(hpa, v1.EventTypeWarning, "SelectorRequired", errMsg)
        setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", "the HPA target's scale is missing a selector")
        return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
    }

    selector, err := labels.Parse(scale.Status.Selector)
    if err != nil {
        errMsg := fmt.Sprintf("couldn't convert selector into a corresponding internal selector object: %v", err)
        a.eventRecorder.Event(hpa, v1.EventTypeWarning, "InvalidSelector", errMsg)
        setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "InvalidSelector", errMsg)
        return 0, "", nil, time.Time{}, fmt.Errorf(errMsg)
    }

    specReplicas := scale.Spec.Replicas
    statusReplicas := scale.Status.Replicas
    statuses = make([]autoscalingv2.MetricStatus, len(metricSpecs))

    invalidMetricsCount := 0
    var invalidMetricError error
    var invalidMetricCondition autoscalingv2.HorizontalPodAutoscalerCondition

    for i, metricSpec := range metricSpecs {
        replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i])

        if err != nil {
            if invalidMetricsCount <= 0 {
                invalidMetricCondition = condition
                invalidMetricError = err
            }
            invalidMetricsCount++
        }
        if err == nil && (replicas == 0 || replicaCountProposal > replicas) {
            timestamp = timestampProposal
            replicas = replicaCountProposal
            metric = metricNameProposal
        }
    }

    // If all metrics are invalid return error and set condition on hpa based on first invalid metric.
    if invalidMetricsCount >= len(metricSpecs) {
        setCondition(hpa, invalidMetricCondition.Type, invalidMetricCondition.Status, invalidMetricCondition.Reason, invalidMetricCondition.Message)
        return 0, "", statuses, time.Time{}, fmt.Errorf("invalid metrics (%v invalid out of %v), first error is: %v", invalidMetricsCount, len(metricSpecs), invalidMetricError)
    }
    setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionTrue, "ValidMetricFound", "the HPA was able to successfully calculate a replica count from %s", metric)
    return replicas, metric, statuses, timestamp, nil
}

func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec,
    specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string,
    timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {

    switch spec.Type {
    case autoscalingv2.ObjectMetricSourceType:
        metricSelector, err := metav1.LabelSelectorAsSelector(spec.Object.Metric.Selector)
        if err != nil {
            condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
            return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
        }
        replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForObjectMetric(specReplicas, statusReplicas, spec, hpa, selector, status, metricSelector)
        if err != nil {
            return 0, "", time.Time{}, condition, fmt.Errorf("failed to get object metric value: %v", err)
        }
    case autoscalingv2.PodsMetricSourceType:
        metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector)
        if err != nil {
            condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err)
            return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
        }
        replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector)
        if err != nil {
            return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err)
        }
    case autoscalingv2.ResourceMetricSourceType:
        replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForResourceMetric(specReplicas, spec, hpa, selector, status)
        if err != nil {
            return 0, "", time.Time{}, condition, err
        }
    case autoscalingv2.ExternalMetricSourceType:
        replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForExternalMetric(specReplicas, statusReplicas, spec, hpa, selector, status)
        if err != nil {
            return 0, "", time.Time{}, condition, err
        }
    default:
        errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type))
        err = fmt.Errorf(errMsg)
        condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err)
        return 0, "", time.Time{}, condition, err
    }
    return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
}

func (a *HorizontalController) computeStatusForObjectMetric(specReplicas, statusReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicas int32, timestamp time.Time, metricName string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) {
    if metricSpec.Object.Target.Type == autoscalingv2.ValueMetricType {
        replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectMetricReplicas(specReplicas, metricSpec.Object.Target.Value.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, selector, metricSelector)
        if err != nil {
            condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
            return 0, timestampProposal, "", condition, err
        }
        *status = autoscalingv2.MetricStatus{
            Type: autoscalingv2.ObjectMetricSourceType,
            Object: &autoscalingv2.ObjectMetricStatus{
                DescribedObject: metricSpec.Object.DescribedObject,
                Metric: autoscalingv2.MetricIdentifier{
                    Name:     metricSpec.Object.Metric.Name,
                    Selector: metricSpec.Object.Metric.Selector,
                },
                Current: autoscalingv2.MetricValueStatus{
                    Value: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
                },
            },
        }
        return replicaCountProposal, timestampProposal, fmt.Sprintf("%s metric %s", metricSpec.Object.DescribedObject.Kind, metricSpec.Object.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
    } else if metricSpec.Object.Target.Type == autoscalingv2.AverageValueMetricType {
        replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetObjectPerPodMetricReplicas(statusReplicas, metricSpec.Object.Target.AverageValue.MilliValue(), metricSpec.Object.Metric.Name, hpa.Namespace, &metricSpec.Object.DescribedObject, metricSelector)
        if err != nil {
            condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
            return 0, time.Time{}, "", condition, fmt.Errorf("failed to get %s object metric: %v", metricSpec.Object.Metric.Name, err)
        }
        *status = autoscalingv2.MetricStatus{
            Type: autoscalingv2.ObjectMetricSourceType,
            Object: &autoscalingv2.ObjectMetricStatus{
                Metric: autoscalingv2.MetricIdentifier{
                    Name:     metricSpec.Object.Metric.Name,
                    Selector: metricSpec.Object.Metric.Selector,
                },
                Current: autoscalingv2.MetricValueStatus{
                    AverageValue: resource.NewMilliQuantity(utilizationProposal, resource.DecimalSI),
                },
            },
        }
        return replicaCountProposal, timestampProposal, fmt.Sprintf("external metric %s(%+v)", metricSpec.Object.Metric.Name, metricSpec.Object.Metric.Selector), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil
    }
    errMsg := "invalid object metric source: neither a value target nor an average value target was set"
    err = fmt.Errorf(errMsg)
    condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetObjectMetric", err)
    return 0, time.Time{}, "", condition, err
}

pkg/controller/podautoscaler/horizontal.go 中

func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, objectRef *autoscaling.CrossVersionObjectReference, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) {
    utilization, timestamp, err = c.metricsClient.GetObjectMetric(metricName, namespace, objectRef, metricSelector)
    if err != nil {
        return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v on %s %s/%s", metricName, objectRef.Kind, namespace, objectRef.Name, err)
    }

    usageRatio := float64(utilization) / float64(targetUtilization)
    replicaCount, timestamp, err = c.getUsageRatioReplicaCount(currentReplicas, usageRatio, namespace, selector)
    return replicaCount, utilization, timestamp, err
}

相关文章

网友评论

      本文标题:k8s 之 hpa controller 源码简单分析

      本文链接:https://www.haomeiwen.com/subject/gumwkktx.html