From ba49ff1a79ca11d83e2891315bc106316aa015fe Mon Sep 17 00:00:00 2001 From: John Kyros Date: Thu, 18 Apr 2024 20:20:54 -0500 Subject: [PATCH 1/7] VPA: fix erroneous container resource split Previously we were dividing the resources per pod by the number of container aggregates, but in a situation where we're doing a rollout and the container names are changing (either a rename, or a removal) we're splitting resources across the wrong number of containers, resulting in smaller values than we should actually have. This collects a count of containers in the model when the pods are loaded, and uses the "high water mark value", so in the event we are doing something like adding a container during a rollout, we favor the pod that has the additional container. There are probably better ways to do this plumbing, but this was my initial attempt, and it does fix the issue. --- .../pkg/recommender/logic/recommender.go | 6 ++--- .../pkg/recommender/logic/recommender_test.go | 8 +++---- .../pkg/recommender/model/cluster.go | 24 +++++++++++++++++++ .../pkg/recommender/model/vpa.go | 10 ++++++++ .../pkg/recommender/routines/recommender.go | 2 +- 5 files changed, 42 insertions(+), 8 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go index 8aaf9cbd5e9..5d553466dc1 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go @@ -39,7 +39,7 @@ var ( // PodResourceRecommender computes resource recommendation for a Vpa object. type PodResourceRecommender interface { - GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources + GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources } // RecommendedPodResources is a Map from container name to recommended resources. @@ -65,13 +65,13 @@ type podResourceRecommender struct { upperBoundMemory MemoryEstimator } -func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources { +func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources { var recommendation = make(RecommendedPodResources) if len(containerNameToAggregateStateMap) == 0 { return recommendation } - fraction := 1.0 / float64(len(containerNameToAggregateStateMap)) + fraction := 1.0 / float64(containersPerPod) minCPU := model.ScaleResource(model.CPUAmountFromCores(*podMinCPUMillicores*0.001), fraction) minMemory := model.ScaleResource(model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), fraction) diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go index 5824745a461..54f70af7bb8 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go @@ -40,7 +40,7 @@ func TestMinResourcesApplied(t *testing.T) { "container-1": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores(*podMinCPUMillicores/1000), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), recommendedResources["container-1"].Target[model.ResourceMemory]) } @@ -63,7 +63,7 @@ func TestMinResourcesSplitAcrossContainers(t *testing.T) { "container-2": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-2"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes((*podMinMemoryMb*1024*1024)/2), recommendedResources["container-1"].Target[model.ResourceMemory]) @@ -90,7 +90,7 @@ func TestControlledResourcesFiltered(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) @@ -119,7 +119,7 @@ func TestControlledResourcesFilteredDefault(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 8fd2fd8f030..2ef2f93b8a6 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -144,9 +144,28 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p cluster.addPodToItsVpa(pod) } + // Tally the number of containers for later when we're averaging the recommendations + cluster.setVPAContainersPerPod(pod) pod.Phase = phase } +// setVPAContainersPerPod sets the number of containers per pod seen for pods connected to this VPA +// so that later when we're splitting the minimum recommendations over containers, we're splitting them over +// the correct number and not just the number of aggregates that have *ever* been present. (We don't want minimum resources +// to erroneously shrink, either) +func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState) { + for _, vpa := range cluster.Vpas { + if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) { + // We want the "high water mark" of the most containers in the pod in the event + // that we're rolling out a pod that has an additional container + if len(pod.Containers) > vpa.ContainersPerPod { + vpa.ContainersPerPod = len(pod.Containers) + } + } + } + +} + // addPodToItsVpa increases the count of Pods associated with a VPA object. // Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed. func (cluster *ClusterState) addPodToItsVpa(pod *PodState) { @@ -275,6 +294,11 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa.PodCount = len(cluster.GetMatchingPods(vpa)) } + + // Default this to the minimum, we will tally the true number when we load the pods later + // TODO(jkyros): This is gross, it depends on the order I know it currently loads things in, but + // that might not be the case someday + vpa.ContainersPerPod = 1 vpa.TargetRef = apiObject.Spec.TargetRef vpa.Annotations = annotationsMap vpa.Conditions = conditionsMap diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index b4f32e07f8c..069a16230ed 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -110,6 +110,11 @@ type Vpa struct { TargetRef *autoscaling.CrossVersionObjectReference // PodCount contains number of live Pods matching a given VPA object. PodCount int + // ContainersPerPod contains the "high water mark" of the number of containers + // per pod to average the recommendation across. Used to make sure we aren't + // "fractionalizing" minResources erroneously during a redeploy when when a pod's + // container is removed or renamed + ContainersPerPod int } // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the @@ -209,6 +214,11 @@ func (vpa *Vpa) AggregateStateByContainerName() ContainerNameToAggregateStateMap return containerNameToAggregateStateMap } +// AggregateContainerStates returns the underlying internal aggregate state map. +func (vpa *Vpa) AggregateContainerStates() aggregateContainerStatesMap { + return vpa.aggregateContainerStates +} + // HasRecommendation returns if the VPA object contains any recommendation func (vpa *Vpa) HasRecommendation() bool { return (vpa.Recommendation != nil) && len(vpa.Recommendation.ContainerRecommendations) > 0 diff --git a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go index 1f456a80c18..6482dfd734d 100644 --- a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go @@ -91,7 +91,7 @@ func (r *recommender) UpdateVPAs() { if !found { continue } - resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa)) + resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa), vpa.ContainersPerPod) had := vpa.HasRecommendation() listOfResourceRecommendation := logic.MapToListOfRecommendedContainerResources(resources) From ecedecffce018a69d7547fb92175c0f1163ccbcf Mon Sep 17 00:00:00 2001 From: John Kyros Date: Thu, 18 Apr 2024 20:26:32 -0500 Subject: [PATCH 2/7] VPA: clean orphaned container checkpoints Previously we were only cleaning checkpoints after something happened to the VPA or the targetRef, and so when a container got renamed the checkpoint would stick around forever. Since we're trying to clean up the aggregates immediately now, we need to force the checkpoint garbage collection to clean up any checkpoints that don't have matching aggregates. If the checkpoints did get loaded back in after a restart, PruneContainers() would take the aggregates back out, but we probably shouldn't leave the checkpoints out there. Signed-off-by: Max Cao --- .../pkg/recommender/input/cluster_feeder.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 5fef7df5deb..48148df28c8 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -290,7 +290,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { } for _, checkpoint := range checkpointList.Items { vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName} - _, exists := feeder.clusterState.Vpas[vpaID] + vpa, exists := feeder.clusterState.Vpas[vpaID] if !exists { err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) if err == nil { @@ -299,6 +299,20 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) } } + // Also clean up a checkpoint if the VPA is still there, but the container is gone. AggregateStateByContainerName + // merges in the initial aggregates so we can use it to check "both lists" (initial, aggregates) at once + // TODO(jkyros): could we also just wait until it got "old" enough, e.g. the checkpoint hasn't + // been updated for an hour, blow it away? Because once we remove it from the aggregate lists, it will stop + // being maintained. + _, aggregateExists := vpa.AggregateStateByContainerName()[checkpoint.Spec.ContainerName] + if !aggregateExists { + err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) + if err == nil { + klog.V(3).InfoS("Orphaned VPA checkpoint cleanup - deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) + } else { + klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) + } + } } } } From dc6c46872c4c33f0ae8a70d886ac0c9ceafd11fa Mon Sep 17 00:00:00 2001 From: John Kyros Date: Fri, 19 Apr 2024 21:24:17 -0500 Subject: [PATCH 3/7] VPA: immediately prune stale vpa aggregates Previously we were letting the rate limited garbage collector clean up the aggregate states, and that works really well in most cases, but when the list of containers in a pod changes, either due to the removal or rename of a container, the aggregates for the old containers stick around forever and cause problems. To get around this, this marks all existing aggregates/initial aggregates in the list for each VPA as "not under a VPA" every time before we LoadPods(), and then LoadPods() will re-mark the aggregates as "under a VPA" for all the ones that are still there, which lets us easily prune the stale container aggregates that are still marked as "not under a VPA" but are still wrongly in the VPA's list. This does leave the ultimate garbage collection to the rate limited garbage collector, which should be fine, we just needed the stale entries to get removed from the per-VPA lists so they didn't affect VPA behavior. --- .../pkg/recommender/input/cluster_feeder.go | 59 +++++++++++++++++++ .../pkg/recommender/model/cluster.go | 4 +- .../pkg/recommender/routines/recommender.go | 6 ++ 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 48148df28c8..dd4a2fa8cdc 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -73,6 +73,12 @@ type ClusterStateFeeder interface { // GarbageCollectCheckpoints removes historical checkpoints that don't have a matching VPA. GarbageCollectCheckpoints() + + // MarkAggregates marks all aggregates in all VPAs as not under VPAs + MarkAggregates() + + // SweepAggregates garbage collects all aggregates in all VPAs aggregate lists that are no longer under VPAs + SweepAggregates() } // ClusterStateFeederFactory makes instances of ClusterStateFeeder. @@ -410,6 +416,59 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) { feeder.clusterState.ObservedVpas = vpaCRDs } +// MarkAggregates marks all aggregates IsUnderVPA=false, so when we go +// through LoadPods(), the valid ones will get marked back to true, and +// we can garbage collect the false ones from the VPAs' aggregate lists. +func (feeder *clusterStateFeeder) MarkAggregates() { + for _, vpa := range feeder.clusterState.Vpas { + for _, container := range vpa.AggregateContainerStates() { + container.IsUnderVPA = false + } + for _, container := range vpa.ContainersInitialAggregateState { + container.IsUnderVPA = false + } + } +} + +// SweepAggregates garbage collects all aggregates/initial aggregates from the VPA where the +// aggregate's container no longer exists. +func (feeder *clusterStateFeeder) SweepAggregates() { + + var aggregatesPruned int + var initialAggregatesPruned int + + // TODO(jkyros): This only removes the container state from the VPA's aggregate states, there + // is still a reference to them in feeder.clusterState.aggregateStateMap, and those get + // garbage collected eventually by the rate limited aggregate garbage collector later. + // Maybe we should clean those up here too since we know which ones are stale? + for _, vpa := range feeder.clusterState.Vpas { + + for containerKey, container := range vpa.AggregateContainerStates() { + if !container.IsUnderVPA { + klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present", + "namespace", vpa.ID.Namespace, + "vpaName", vpa.ID.VpaName, + "containerName", containerKey.ContainerName()) + vpa.DeleteAggregation(containerKey) + aggregatesPruned = aggregatesPruned + 1 + } + } + for containerKey, container := range vpa.ContainersInitialAggregateState { + if !container.IsUnderVPA { + klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present", + "namespace", vpa.ID.Namespace, + "vpaName", vpa.ID.VpaName, + "containerName", containerKey) + delete(vpa.ContainersInitialAggregateState, containerKey) + initialAggregatesPruned = initialAggregatesPruned + 1 + } + } + } + if initialAggregatesPruned > 0 || aggregatesPruned > 0 { + klog.InfoS("Pruned aggregate and initial aggregate containers", "aggregatesPruned", aggregatesPruned, "initialAggregatesPruned", initialAggregatesPruned) + } +} + // LoadPods loads pod into the cluster state. func (feeder *clusterStateFeeder) LoadPods() { podSpecs, err := feeder.specClient.GetPodSpecs() diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 2ef2f93b8a6..09cce9e9c25 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -216,12 +216,14 @@ func (cluster *ClusterState) AddOrUpdateContainer(containerID ContainerID, reque if !podExists { return NewKeyError(containerID.PodID) } + aggregateState := cluster.findOrCreateAggregateContainerState(containerID) if container, containerExists := pod.Containers[containerID.ContainerName]; !containerExists { - cluster.findOrCreateAggregateContainerState(containerID) pod.Containers[containerID.ContainerName] = NewContainerState(request, NewContainerStateAggregatorProxy(cluster, containerID)) } else { // Container aleady exists. Possibly update the request. container.Request = request + // Mark this container as still managed so the aggregates don't get garbage collected + aggregateState.IsUnderVPA = true } return nil } diff --git a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go index 6482dfd734d..573602b33fb 100644 --- a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go @@ -150,9 +150,15 @@ func (r *recommender) RunOnce() { r.clusterStateFeeder.LoadVPAs(ctx) timer.ObserveStep("LoadVPAs") + r.clusterStateFeeder.MarkAggregates() + timer.ObserveStep("MarkAggregates") + r.clusterStateFeeder.LoadPods() timer.ObserveStep("LoadPods") + r.clusterStateFeeder.SweepAggregates() + timer.ObserveStep("SweepAggregates") + r.clusterStateFeeder.LoadRealTimeMetrics() timer.ObserveStep("LoadMetrics") klog.V(3).InfoS("ClusterState is tracking", "pods", len(r.clusterState.Pods), "vpas", len(r.clusterState.Vpas)) From d918d185121073150116c95de508beab5ca513b8 Mon Sep 17 00:00:00 2001 From: Max Cao Date: Tue, 3 Dec 2024 13:27:29 -0800 Subject: [PATCH 4/7] VPA: Slightly improve runtime on splitting aggregates Signed-off-by: Max Cao --- .../pkg/recommender/model/cluster.go | 37 ++++++++----------- 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 09cce9e9c25..71fea092f5d 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -130,22 +130,24 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p } newlabelSetKey := cluster.getLabelSetKey(newLabels) - if podExists && pod.labelSetKey != newlabelSetKey { - // This Pod is already counted in the old VPA, remove the link. - cluster.removePodFromItsVpa(pod) - } - if !podExists || pod.labelSetKey != newlabelSetKey { + if pod.labelSetKey != newlabelSetKey { + if podExists { + // This Pod is already counted in the old VPA, remove the link. + cluster.removePodFromItsVpa(pod) + } pod.labelSetKey = newlabelSetKey // Set the links between the containers and aggregations based on the current pod labels. for containerName, container := range pod.Containers { containerID := ContainerID{PodID: podID, ContainerName: containerName} container.aggregator = cluster.findOrCreateAggregateContainerState(containerID) } - - cluster.addPodToItsVpa(pod) + cluster.setVPAContainersPerPod(pod, true) + } else if !podExists { + cluster.setVPAContainersPerPod(pod, true) + } else if len(pod.Containers) > 1 { + // Tally the number of containers for later when we're averaging the recommendations if there's more than one container + cluster.setVPAContainersPerPod(pod, false) } - // Tally the number of containers for later when we're averaging the recommendations - cluster.setVPAContainersPerPod(pod) pod.Phase = phase } @@ -153,7 +155,8 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p // so that later when we're splitting the minimum recommendations over containers, we're splitting them over // the correct number and not just the number of aggregates that have *ever* been present. (We don't want minimum resources // to erroneously shrink, either) -func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState) { +// If addPodToItsVpa is true, it also increments the pod count for the VPA. +func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState, addPodToItsVpa bool) { for _, vpa := range cluster.Vpas { if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) { // We want the "high water mark" of the most containers in the pod in the event @@ -161,17 +164,9 @@ func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState) { if len(pod.Containers) > vpa.ContainersPerPod { vpa.ContainersPerPod = len(pod.Containers) } - } - } - -} - -// addPodToItsVpa increases the count of Pods associated with a VPA object. -// Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed. -func (cluster *ClusterState) addPodToItsVpa(pod *PodState) { - for _, vpa := range cluster.Vpas { - if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) { - vpa.PodCount++ + if addPodToItsVpa { + vpa.PodCount++ + } } } } From 9d94f625568141796f0f8a7bcab51104a07f38b9 Mon Sep 17 00:00:00 2001 From: Max Cao Date: Tue, 3 Dec 2024 13:28:33 -0800 Subject: [PATCH 5/7] VPA: Add e2e test for spliting recommendations when removing/renaming containers Signed-off-by: Max Cao --- vertical-pod-autoscaler/e2e/v1/common.go | 8 ++ vertical-pod-autoscaler/e2e/v1/recommender.go | 97 +++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/vertical-pod-autoscaler/e2e/v1/common.go b/vertical-pod-autoscaler/e2e/v1/common.go index 6ce493c9a8b..a554a2a864e 100644 --- a/vertical-pod-autoscaler/e2e/v1/common.go +++ b/vertical-pod-autoscaler/e2e/v1/common.go @@ -352,6 +352,14 @@ func PatchVpaRecommendation(f *framework.Framework, vpa *vpa_types.VerticalPodAu gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to patch VPA.") } +// PatchDeployment patches a deployment with a given patch. +func PatchDeployment(f *framework.Framework, deployment *appsv1.Deployment, patch *patchRecord) { + patchBytes, err := json.Marshal([]patchRecord{*patch}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + _, err = f.ClientSet.AppsV1().Deployments(f.Namespace.Name).Patch(context.TODO(), deployment.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error patching deployment") +} + // AnnotatePod adds annotation for an existing pod. func AnnotatePod(f *framework.Framework, podName, annotationName, annotationValue string) { bytes, err := json.Marshal([]patchRecord{{ diff --git a/vertical-pod-autoscaler/e2e/v1/recommender.go b/vertical-pod-autoscaler/e2e/v1/recommender.go index 8f171f33f5d..55398da3642 100644 --- a/vertical-pod-autoscaler/e2e/v1/recommender.go +++ b/vertical-pod-autoscaler/e2e/v1/recommender.go @@ -411,6 +411,103 @@ var _ = RecommenderE2eDescribe("VPA CRD object", func() { }) }) +const recommendationLoopInterval = 1 * time.Minute + +var _ = RecommenderE2eDescribe("VPA CRD object", func() { + f := framework.NewDefaultFramework("vertical-pod-autoscaling") + f.NamespacePodSecurityEnforceLevel = podsecurity.LevelBaseline + + var vpaClientSet vpa_clientset.Interface + + ginkgo.BeforeEach(func() { + vpaClientSet = getVpaClientSet(f) + }) + + ginkgo.It("only provides recommendation to containers that exist when renaming a container", func() { + ginkgo.By("Setting up a hamster deployment") + d := NewNHamstersDeployment(f, 1 /*number of containers*/) + _ = startDeploymentPods(f, d) + + ginkgo.By("Setting up VPA CRD") + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer("*"). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By("Waiting for recommendation to be filled for the container") + vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0))) + + ginkgo.By("Renaming the container") + newContainerName := "renamed-container" + patchRecord := &patchRecord{ + Op: "replace", + Path: "/spec/template/spec/containers/0/name", + Value: newContainerName, + } + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + PatchDeployment(f, d, patchRecord) + + ginkgo.By("Waiting for recommendation to be filled for the renamed container and only the renamed container") + time.Sleep(recommendationLoopInterval) + vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD) + + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + errMsg := fmt.Sprintf("%s is the only container in the VPA CR. There should not be any recommendations for %s", + newContainerName, + GetHamsterContainerNameByIndex(0)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(newContainerName), errMsg) + }) + + ginkgo.It("only provides recommendation to containers that exist when removing a container", func() { + ginkgo.By("Setting up a hamster deployment") + d := NewNHamstersDeployment(f, 2 /*number of containers*/) + _ = startDeploymentPods(f, d) + + ginkgo.By("Setting up VPA CRD") + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer("*"). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By("Waiting for recommendation to be filled for both containers") + vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(2)) + + ginkgo.By("Removing the second container") + patchRecord := &patchRecord{ + Op: "remove", + Path: "/spec/template/spec/containers/1", + } + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + PatchDeployment(f, d, patchRecord) + + ginkgo.By("Waiting for recommendation to be filled for just one container") + time.Sleep(recommendationLoopInterval) + vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD) + + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + errMsg := fmt.Sprintf("%s is now the only container in the VPA CR. There should not be any recommendations for %s", + GetHamsterContainerNameByIndex(0), + GetHamsterContainerNameByIndex(1)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0)), errMsg) + }) + +}) + func deleteRecommender(c clientset.Interface) error { namespace := "kube-system" listOptions := metav1.ListOptions{} From b6e435bebde2a295229dac410577942da082ed9f Mon Sep 17 00:00:00 2001 From: Max Cao Date: Wed, 11 Dec 2024 16:30:51 -0800 Subject: [PATCH 6/7] Introduce pruningGracePeriod which allows a grace period before the vpa prunes recommendations from non-existente containers Signed-off-by: Max Cao --- .../deploy/vpa-v1-crd-gen.yaml | 25 ++++++++++++++----- .../pkg/apis/autoscaling.k8s.io/v1/types.go | 9 +++++++ .../pkg/recommender/input/cluster_feeder.go | 6 ++--- .../model/aggregate_container_state.go | 16 +++++++++++- .../pkg/recommender/model/cluster.go | 4 ++- .../pkg/recommender/model/vpa.go | 4 ++- .../pkg/recommender/model/vpa_test.go | 19 +++++++++----- vertical-pod-autoscaler/pkg/utils/vpa/api.go | 17 +++++++++++++ 8 files changed, 82 insertions(+), 18 deletions(-) diff --git a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml index 3811ca72bf1..ad2b9de3e00 100644 --- a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml +++ b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml @@ -346,6 +346,19 @@ spec: - Auto - "Off" type: string + pruningGracePeriod: + description: PruningGracePeriod is the duration to wait + before pruning recommendations for containers that no + longer exist. This is useful for containers created by + Jobs from CronJobs, which are frequently created and deleted. + By setting a grace period, recommendations for these containers + are not pruned immediately after they are removed, providing + recommendations to new containers created by subsequent + Jobs. If not specified, recommendations for non-existent + containers are pruned the next time a recommendation loop + is run. However, if the targetRef points to a CronJob, + the default value is 24 hours. + type: string type: object type: array type: object @@ -363,13 +376,13 @@ spec: grouped by the target resource. properties: apiVersion: - description: API version of the referent + description: apiVersion is the API version of the referent type: string kind: - description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string name: - description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names' + description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string required: - kind @@ -639,13 +652,13 @@ spec: grouped by the target resource. properties: apiVersion: - description: API version of the referent + description: apiVersion is the API version of the referent type: string kind: - description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string name: - description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names' + description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string required: - kind diff --git a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go index c1a686371e6..68a38f1bc2c 100644 --- a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go +++ b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go @@ -214,6 +214,15 @@ type ContainerResourcePolicy struct { // The default is "RequestsAndLimits". // +optional ControlledValues *ContainerControlledValues `json:"controlledValues,omitempty" protobuf:"bytes,6,rep,name=controlledValues"` + + // PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist. + // This is useful for containers created by Jobs from CronJobs, which are frequently created and deleted. + // By setting a grace period, recommendations for these containers are not pruned immediately + // after they are removed, providing recommendations to new containers created by subsequent Jobs. + // If not specified, recommendations for non-existent containers are pruned the next time a recommendation + // loop is run. However, if the targetRef points to a CronJob, the default value is 24 hours. + // +optional + PruningGracePeriod *metav1.Duration `json:"pruningGracePeriod,omitempty" protobuf:"bytes,4,opt,name=pruningGracePeriod"` } const ( diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index dd4a2fa8cdc..30e6397b250 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -441,10 +441,10 @@ func (feeder *clusterStateFeeder) SweepAggregates() { // is still a reference to them in feeder.clusterState.aggregateStateMap, and those get // garbage collected eventually by the rate limited aggregate garbage collector later. // Maybe we should clean those up here too since we know which ones are stale? + now := time.Now() for _, vpa := range feeder.clusterState.Vpas { - for containerKey, container := range vpa.AggregateContainerStates() { - if !container.IsUnderVPA { + if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() { } } for containerKey, container := range vpa.ContainersInitialAggregateState { - if !container.IsUnderVPA { + if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index 8ee9cc8637d..061a207a12b 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -110,6 +110,8 @@ type AggregateContainerState struct { UpdateMode *vpa_types.UpdateMode ScalingMode *vpa_types.ContainerScalingMode ControlledResources *[]ResourceName + LastUpdateTime time.Time + PruningGracePeriod metav1.Duration } // GetLastRecommendation returns last recorded recommendation. @@ -143,6 +145,16 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName { return DefaultControlledResources } +// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator. +func (a *AggregateContainerState) GetLastUpdate() time.Time { + return a.LastUpdateTime +} + +// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator. +func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration { + return a.PruningGracePeriod +} + // MarkNotAutoscaled registers that this container state is not controlled by // a VPA object. func (a *AggregateContainerState) MarkNotAutoscaled() { @@ -171,10 +183,12 @@ func (a *AggregateContainerState) MergeContainerState(other *AggregateContainerS // NewAggregateContainerState returns a new, empty AggregateContainerState. func NewAggregateContainerState() *AggregateContainerState { config := GetAggregationsConfig() + now := time.Now() return &AggregateContainerState{ AggregateCPUUsage: util.NewDecayingHistogram(config.CPUHistogramOptions, config.CPUHistogramDecayHalfLife), AggregateMemoryPeaks: util.NewDecayingHistogram(config.MemoryHistogramOptions, config.MemoryHistogramDecayHalfLife), - CreationTime: time.Now(), + CreationTime: now, + LastUpdateTime: now, } } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 71fea092f5d..4a7abf06f71 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -219,6 +219,8 @@ func (cluster *ClusterState) AddOrUpdateContainer(containerID ContainerID, reque container.Request = request // Mark this container as still managed so the aggregates don't get garbage collected aggregateState.IsUnderVPA = true + // Update the last update time so we potentially don't garbage collect it too soon + aggregateState.LastUpdateTime = time.Now() } return nil } @@ -284,7 +286,7 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto vpaExists = false } if !vpaExists { - vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time) + vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time) cluster.Vpas[vpaID] = vpa for aggregationKey, aggregation := range cluster.aggregateStateMap { vpa.UseAggregationIfMatching(aggregationKey, aggregation) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index 069a16230ed..986784e7a64 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -119,7 +119,7 @@ type Vpa struct { // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the // links to the matched aggregations. -func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa { +func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa { vpa := &Vpa{ ID: id, PodSelector: selector, @@ -128,6 +128,7 @@ func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa { Created: created, Annotations: make(vpaAnnotationsMap), Conditions: make(vpaConditionsMap), + TargetRef: targetRef, // APIVersion defaults to the version of the client used to read resources. // If a new version is introduced that needs to be differentiated beyond the // client conversion, this needs to be done based on the resource content. @@ -160,6 +161,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre vpa.aggregateContainerStates[aggregationKey] = aggregation aggregation.IsUnderVPA = true aggregation.UpdateMode = vpa.UpdateMode + aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef) aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy)) } } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go index cdc8d0fee1a..58170953160 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + autoscaling "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" @@ -30,14 +31,20 @@ import ( ) var ( - anyTime = time.Unix(0, 0) + anyTime = time.Unix(0, 0) + regularTargetRef = &autoscaling.CrossVersionObjectReference{ + Kind: "Deployment", + Name: "test-deployment", + APIVersion: "apps/v1", + } + // TODO(maxcao13): write tests for new container policy field ) func TestMergeAggregateContainerState(t *testing.T) { containersInitialAggregateState := ContainerNameToAggregateStateMap{} containersInitialAggregateState["test"] = NewAggregateContainerState() - vpa := NewVpa(VpaID{}, nil, anyTime) + vpa := NewVpa(VpaID{}, nil, nil, anyTime) vpa.ContainersInitialAggregateState = containersInitialAggregateState containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{} @@ -119,7 +126,7 @@ func TestUpdateConditions(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { containerName := "container" - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0)) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0)) if tc.hasRecommendation { vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get() } @@ -189,7 +196,7 @@ func TestUpdateRecommendation(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { namespace := "test-namespace" - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime) for container, rec := range tc.containers { state := &AggregateContainerState{} if rec != nil { @@ -355,7 +362,7 @@ func TestUseAggregationIfMatching(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) vpa.UpdateMode = tc.updateMode key := mockAggregateStateKey{ namespace: namespace, @@ -542,7 +549,7 @@ func TestSetResourcePolicy(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) for _, container := range tc.containers { containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String()) vpa.aggregateContainerStates[containerKey] = aggregation diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 8fbf11c9835..22d3995c8ca 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -23,6 +23,7 @@ import ( "strings" "time" + autoscaling "k8s.io/api/autoscaling/v1" core "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -218,6 +219,22 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR return *containerPolicy.ControlledValues } +// GetContainerPruningGracePeriod returns the pruning grace period for a container. +func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy, targetRef *autoscaling.CrossVersionObjectReference) meta.Duration { + containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy) + if containerPolicy == nil || containerPolicy.PruningGracePeriod == nil { + defaultGracePeriod := meta.Duration{Duration: time.Duration(0)} + if targetRef != nil && targetRef.Kind == "CronJob" { + // CronJob is a special case, because they create containers they are usually supposed to be deleted after the job is done. + // So we set a higher default grace period so that future recommendations for the same workload are not pruned too early. + // TODO(maxcao13): maybe it makes sense to set the default based on the cron schedule? + defaultGracePeriod = meta.Duration{Duration: 24 * time.Hour} + } + return defaultGracePeriod + } + return *containerPolicy.PruningGracePeriod +} + // CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object. // If object doesn't exits it is created. func CreateOrUpdateVpaCheckpoint(vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointInterface, From 808c62b8cfaf2411c1bde0ba6b49b2d97c8c8d10 Mon Sep 17 00:00:00 2001 From: Max Cao Date: Tue, 17 Dec 2024 13:38:01 -0800 Subject: [PATCH 7/7] Add globalPruningGracePeriod flag which defaults to a long time for non-breaking opt-in change Signed-off-by: Max Cao --- .../pkg/recommender/input/cluster_feeder.go | 4 +-- .../model/aggregate_container_state.go | 27 ++++++++++++------- .../pkg/recommender/model/cluster.go | 24 ++++++++++++----- .../pkg/recommender/model/vpa.go | 5 ++-- .../pkg/recommender/model/vpa_test.go | 18 +++++-------- vertical-pod-autoscaler/pkg/utils/vpa/api.go | 16 +++-------- 6 files changed, 49 insertions(+), 45 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 30e6397b250..ffcad65d24c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -444,7 +444,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() { now := time.Now() for _, vpa := range feeder.clusterState.Vpas { for containerKey, container := range vpa.AggregateContainerStates() { - if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { + if !container.IsUnderVPA && container.IsAggregateStale(now) { klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() { } } for containerKey, container := range vpa.ContainersInitialAggregateState { - if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { + if !container.IsUnderVPA && container.IsAggregateStale(now) { klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index 061a207a12b..76014778abb 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -36,6 +36,7 @@ limitations under the License. package model import ( + "flag" "fmt" "math" "time" @@ -46,6 +47,10 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util" ) +var ( + globalPruningGracePeriodDuration = flag.Duration("pruning-grace-period-duration", 24*365*100*time.Hour, `The grace period for deleting stale aggregates and recommendations. By default, set to 100 years, effectively disabling it.`) +) + // ContainerNameToAggregateStateMap maps a container name to AggregateContainerState // that aggregates state of containers with that name. type ContainerNameToAggregateStateMap map[string]*AggregateContainerState @@ -111,7 +116,7 @@ type AggregateContainerState struct { ScalingMode *vpa_types.ContainerScalingMode ControlledResources *[]ResourceName LastUpdateTime time.Time - PruningGracePeriod metav1.Duration + PruningGracePeriod time.Duration } // GetLastRecommendation returns last recorded recommendation. @@ -145,14 +150,9 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName { return DefaultControlledResources } -// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator. -func (a *AggregateContainerState) GetLastUpdate() time.Time { - return a.LastUpdateTime -} - -// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator. -func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration { - return a.PruningGracePeriod +// IsAggregateStale returns true if the last update time is past its grace period and the aggregate should be pruned. +func (a *AggregateContainerState) IsAggregateStale(now time.Time) bool { + return now.After(a.LastUpdateTime.Add(a.PruningGracePeriod)) } // MarkNotAutoscaled registers that this container state is not controlled by @@ -303,6 +303,15 @@ func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.Con } } +// UpdatePruningGracePeriod updates the an aggregate state with a containerPruningGracePeriod or the global pruning duration if nil. +func (a *AggregateContainerState) UpdatePruningGracePeriod(containerPruningGracePeriod *metav1.Duration) { + if containerPruningGracePeriod != nil { + a.PruningGracePeriod = containerPruningGracePeriod.Duration + } else { + a.PruningGracePeriod = *globalPruningGracePeriodDuration + } +} + // AggregateStateByContainerName takes a set of AggregateContainerStates and merge them // grouping by the container name. The result is a map from the container name to the aggregation // from all input containers with the given name. diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 4a7abf06f71..4ea8fde839c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -277,16 +277,26 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa, vpaExists := cluster.Vpas[vpaID] - if vpaExists && (vpa.PodSelector.String() != selector.String()) { - // Pod selector was changed. Delete the VPA object and recreate - // it with the new selector. - if err := cluster.DeleteVpa(vpaID); err != nil { - return err + if vpaExists { + if vpa.PodSelector.String() != selector.String() { + // Pod selector was changed. Delete the VPA object and recreate + // it with the new selector. + if err := cluster.DeleteVpa(vpaID); err != nil { + return err + } + + vpaExists = false + } else { + // Update the pruningGracePeriod to ensure a potential new grace period is applied. + // This prevents an old, excessively long grace period from persisting and + // potentially causing the VPA to keep stale aggregates with an outdated grace period. + for key, containerState := range vpa.aggregateContainerStates { + containerState.UpdatePruningGracePeriod(vpa_utils.GetContainerPruningGracePeriod(key.ContainerName(), apiObject.Spec.ResourcePolicy)) + } } - vpaExists = false } if !vpaExists { - vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time) + vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time) cluster.Vpas[vpaID] = vpa for aggregationKey, aggregation := range cluster.aggregateStateMap { vpa.UseAggregationIfMatching(aggregationKey, aggregation) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index 986784e7a64..fc3d54c0350 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -119,7 +119,7 @@ type Vpa struct { // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the // links to the matched aggregations. -func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa { +func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa { vpa := &Vpa{ ID: id, PodSelector: selector, @@ -128,7 +128,6 @@ func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVers Created: created, Annotations: make(vpaAnnotationsMap), Conditions: make(vpaConditionsMap), - TargetRef: targetRef, // APIVersion defaults to the version of the client used to read resources. // If a new version is introduced that needs to be differentiated beyond the // client conversion, this needs to be done based on the resource content. @@ -161,7 +160,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre vpa.aggregateContainerStates[aggregationKey] = aggregation aggregation.IsUnderVPA = true aggregation.UpdateMode = vpa.UpdateMode - aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef) + aggregation.UpdatePruningGracePeriod(vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy)) aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy)) } } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go index 58170953160..11e690b5ab2 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - autoscaling "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" @@ -31,12 +30,7 @@ import ( ) var ( - anyTime = time.Unix(0, 0) - regularTargetRef = &autoscaling.CrossVersionObjectReference{ - Kind: "Deployment", - Name: "test-deployment", - APIVersion: "apps/v1", - } + anyTime = time.Unix(0, 0) // TODO(maxcao13): write tests for new container policy field ) @@ -44,7 +38,7 @@ func TestMergeAggregateContainerState(t *testing.T) { containersInitialAggregateState := ContainerNameToAggregateStateMap{} containersInitialAggregateState["test"] = NewAggregateContainerState() - vpa := NewVpa(VpaID{}, nil, nil, anyTime) + vpa := NewVpa(VpaID{}, nil, anyTime) vpa.ContainersInitialAggregateState = containersInitialAggregateState containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{} @@ -126,7 +120,7 @@ func TestUpdateConditions(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { containerName := "container" - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0)) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0)) if tc.hasRecommendation { vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get() } @@ -196,7 +190,7 @@ func TestUpdateRecommendation(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { namespace := "test-namespace" - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime) for container, rec := range tc.containers { state := &AggregateContainerState{} if rec != nil { @@ -362,7 +356,7 @@ func TestUseAggregationIfMatching(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime) vpa.UpdateMode = tc.updateMode key := mockAggregateStateKey{ namespace: namespace, @@ -549,7 +543,7 @@ func TestSetResourcePolicy(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime) for _, container := range tc.containers { containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String()) vpa.aggregateContainerStates[containerKey] = aggregation diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 22d3995c8ca..ebd2afc566c 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -23,7 +23,6 @@ import ( "strings" "time" - autoscaling "k8s.io/api/autoscaling/v1" core "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -220,19 +219,12 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR } // GetContainerPruningGracePeriod returns the pruning grace period for a container. -func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy, targetRef *autoscaling.CrossVersionObjectReference) meta.Duration { +func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy) (gracePeriod *meta.Duration) { containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy) - if containerPolicy == nil || containerPolicy.PruningGracePeriod == nil { - defaultGracePeriod := meta.Duration{Duration: time.Duration(0)} - if targetRef != nil && targetRef.Kind == "CronJob" { - // CronJob is a special case, because they create containers they are usually supposed to be deleted after the job is done. - // So we set a higher default grace period so that future recommendations for the same workload are not pruned too early. - // TODO(maxcao13): maybe it makes sense to set the default based on the cron schedule? - defaultGracePeriod = meta.Duration{Duration: 24 * time.Hour} - } - return defaultGracePeriod + if containerPolicy != nil { + gracePeriod = containerPolicy.PruningGracePeriod } - return *containerPolicy.PruningGracePeriod + return } // CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object.