From 098c1383b6945483aed232ada5b2517d08e7c329 Mon Sep 17 00:00:00 2001 From: Max Cao Date: Tue, 17 Dec 2024 13:38:01 -0800 Subject: [PATCH] Add globalPruningGracePeriod flag which defaults to a long time for non-breaking opt-in change Signed-off-by: Max Cao --- .../pkg/recommender/input/cluster_feeder.go | 4 +-- .../model/aggregate_container_state.go | 26 ++++++++++++------- .../pkg/recommender/model/cluster.go | 24 ++++++++++++----- .../pkg/recommender/model/vpa.go | 5 ++-- .../pkg/recommender/model/vpa_test.go | 18 +++++-------- vertical-pod-autoscaler/pkg/utils/vpa/api.go | 16 +++--------- 6 files changed, 48 insertions(+), 45 deletions(-) diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 30e6397b250..ffcad65d24c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -444,7 +444,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() { now := time.Now() for _, vpa := range feeder.clusterState.Vpas { for containerKey, container := range vpa.AggregateContainerStates() { - if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { + if !container.IsUnderVPA && container.IsAggregateStale(now) { klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() { } } for containerKey, container := range vpa.ContainersInitialAggregateState { - if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) { + if !container.IsUnderVPA && container.IsAggregateStale(now) { klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present", "namespace", vpa.ID.Namespace, "vpaName", vpa.ID.VpaName, diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index 061a207a12b..6041ac90cfe 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -36,6 +36,7 @@ limitations under the License. package model import ( + "flag" "fmt" "math" "time" @@ -46,6 +47,10 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util" ) +var ( + globalPruningGracePeriodDuration = flag.Duration("pruning-grace-period-duration", 24*365*100*time.Hour, `The grace period for deleting stale aggregates and recommendations. By default, set to 100 years, effectively disabling it.`) +) + // ContainerNameToAggregateStateMap maps a container name to AggregateContainerState // that aggregates state of containers with that name. type ContainerNameToAggregateStateMap map[string]*AggregateContainerState @@ -111,7 +116,7 @@ type AggregateContainerState struct { ScalingMode *vpa_types.ContainerScalingMode ControlledResources *[]ResourceName LastUpdateTime time.Time - PruningGracePeriod metav1.Duration + PruningGracePeriod time.Duration } // GetLastRecommendation returns last recorded recommendation. @@ -145,14 +150,9 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName { return DefaultControlledResources } -// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator. -func (a *AggregateContainerState) GetLastUpdate() time.Time { - return a.LastUpdateTime -} - -// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator. -func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration { - return a.PruningGracePeriod +// IsAggregateStale returns true if the last update time is past its grace period and the aggregate should be pruned. +func (a *AggregateContainerState) IsAggregateStale(now time.Time) bool { + return now.After(a.LastUpdateTime.Add(a.PruningGracePeriod)) } // MarkNotAutoscaled registers that this container state is not controlled by @@ -303,6 +303,14 @@ func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.Con } } +func (a *AggregateContainerState) UpdatePruningGracePeriod(containerPruningGracePeriod *metav1.Duration) { + if containerPruningGracePeriod != nil { + a.PruningGracePeriod = containerPruningGracePeriod.Duration + } else { + a.PruningGracePeriod = *globalPruningGracePeriodDuration + } +} + // AggregateStateByContainerName takes a set of AggregateContainerStates and merge them // grouping by the container name. The result is a map from the container name to the aggregation // from all input containers with the given name. diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 4a7abf06f71..4ea8fde839c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -277,16 +277,26 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa, vpaExists := cluster.Vpas[vpaID] - if vpaExists && (vpa.PodSelector.String() != selector.String()) { - // Pod selector was changed. Delete the VPA object and recreate - // it with the new selector. - if err := cluster.DeleteVpa(vpaID); err != nil { - return err + if vpaExists { + if vpa.PodSelector.String() != selector.String() { + // Pod selector was changed. Delete the VPA object and recreate + // it with the new selector. + if err := cluster.DeleteVpa(vpaID); err != nil { + return err + } + + vpaExists = false + } else { + // Update the pruningGracePeriod to ensure a potential new grace period is applied. + // This prevents an old, excessively long grace period from persisting and + // potentially causing the VPA to keep stale aggregates with an outdated grace period. + for key, containerState := range vpa.aggregateContainerStates { + containerState.UpdatePruningGracePeriod(vpa_utils.GetContainerPruningGracePeriod(key.ContainerName(), apiObject.Spec.ResourcePolicy)) + } } - vpaExists = false } if !vpaExists { - vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time) + vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time) cluster.Vpas[vpaID] = vpa for aggregationKey, aggregation := range cluster.aggregateStateMap { vpa.UseAggregationIfMatching(aggregationKey, aggregation) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index 986784e7a64..fc3d54c0350 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -119,7 +119,7 @@ type Vpa struct { // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the // links to the matched aggregations. -func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa { +func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa { vpa := &Vpa{ ID: id, PodSelector: selector, @@ -128,7 +128,6 @@ func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVers Created: created, Annotations: make(vpaAnnotationsMap), Conditions: make(vpaConditionsMap), - TargetRef: targetRef, // APIVersion defaults to the version of the client used to read resources. // If a new version is introduced that needs to be differentiated beyond the // client conversion, this needs to be done based on the resource content. @@ -161,7 +160,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre vpa.aggregateContainerStates[aggregationKey] = aggregation aggregation.IsUnderVPA = true aggregation.UpdateMode = vpa.UpdateMode - aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef) + aggregation.UpdatePruningGracePeriod(vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy)) aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy)) } } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go index 58170953160..11e690b5ab2 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - autoscaling "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" @@ -31,12 +30,7 @@ import ( ) var ( - anyTime = time.Unix(0, 0) - regularTargetRef = &autoscaling.CrossVersionObjectReference{ - Kind: "Deployment", - Name: "test-deployment", - APIVersion: "apps/v1", - } + anyTime = time.Unix(0, 0) // TODO(maxcao13): write tests for new container policy field ) @@ -44,7 +38,7 @@ func TestMergeAggregateContainerState(t *testing.T) { containersInitialAggregateState := ContainerNameToAggregateStateMap{} containersInitialAggregateState["test"] = NewAggregateContainerState() - vpa := NewVpa(VpaID{}, nil, nil, anyTime) + vpa := NewVpa(VpaID{}, nil, anyTime) vpa.ContainersInitialAggregateState = containersInitialAggregateState containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{} @@ -126,7 +120,7 @@ func TestUpdateConditions(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { containerName := "container" - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0)) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0)) if tc.hasRecommendation { vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get() } @@ -196,7 +190,7 @@ func TestUpdateRecommendation(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { namespace := "test-namespace" - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime) for container, rec := range tc.containers { state := &AggregateContainerState{} if rec != nil { @@ -362,7 +356,7 @@ func TestUseAggregationIfMatching(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime) vpa.UpdateMode = tc.updateMode key := mockAggregateStateKey{ namespace: namespace, @@ -549,7 +543,7 @@ func TestSetResourcePolicy(t *testing.T) { if !assert.NoError(t, err) { t.FailNow() } - vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime) + vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime) for _, container := range tc.containers { containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String()) vpa.aggregateContainerStates[containerKey] = aggregation diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 22d3995c8ca..ebd2afc566c 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -23,7 +23,6 @@ import ( "strings" "time" - autoscaling "k8s.io/api/autoscaling/v1" core "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -220,19 +219,12 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR } // GetContainerPruningGracePeriod returns the pruning grace period for a container. -func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy, targetRef *autoscaling.CrossVersionObjectReference) meta.Duration { +func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy) (gracePeriod *meta.Duration) { containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy) - if containerPolicy == nil || containerPolicy.PruningGracePeriod == nil { - defaultGracePeriod := meta.Duration{Duration: time.Duration(0)} - if targetRef != nil && targetRef.Kind == "CronJob" { - // CronJob is a special case, because they create containers they are usually supposed to be deleted after the job is done. - // So we set a higher default grace period so that future recommendations for the same workload are not pruned too early. - // TODO(maxcao13): maybe it makes sense to set the default based on the cron schedule? - defaultGracePeriod = meta.Duration{Duration: 24 * time.Hour} - } - return defaultGracePeriod + if containerPolicy != nil { + gracePeriod = containerPolicy.PruningGracePeriod } - return *containerPolicy.PruningGracePeriod + return } // CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object.