Skip to content

Commit

Permalink
Add globalPruningGracePeriod flag which defaults to a long time for n…
Browse files Browse the repository at this point in the history
…on-breaking opt-in change

Signed-off-by: Max Cao <[email protected]>
  • Loading branch information
maxcao13 committed Dec 17, 2024
1 parent b6e435b commit 098c138
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
now := time.Now()
for _, vpa := range feeder.clusterState.Vpas {
for containerKey, container := range vpa.AggregateContainerStates() {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand All @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
}
}
for containerKey, container := range vpa.ContainersInitialAggregateState {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ limitations under the License.
package model

import (
"flag"
"fmt"
"math"
"time"
Expand All @@ -46,6 +47,10 @@ import (
"k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util"
)

var (
globalPruningGracePeriodDuration = flag.Duration("pruning-grace-period-duration", 24*365*100*time.Hour, `The grace period for deleting stale aggregates and recommendations. By default, set to 100 years, effectively disabling it.`)
)

// ContainerNameToAggregateStateMap maps a container name to AggregateContainerState
// that aggregates state of containers with that name.
type ContainerNameToAggregateStateMap map[string]*AggregateContainerState
Expand Down Expand Up @@ -111,7 +116,7 @@ type AggregateContainerState struct {
ScalingMode *vpa_types.ContainerScalingMode
ControlledResources *[]ResourceName
LastUpdateTime time.Time
PruningGracePeriod metav1.Duration
PruningGracePeriod time.Duration
}

// GetLastRecommendation returns last recorded recommendation.
Expand Down Expand Up @@ -145,14 +150,9 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName {
return DefaultControlledResources
}

// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetLastUpdate() time.Time {
return a.LastUpdateTime
}

// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration {
return a.PruningGracePeriod
// IsAggregateStale returns true if the last update time is past its grace period and the aggregate should be pruned.
func (a *AggregateContainerState) IsAggregateStale(now time.Time) bool {
return now.After(a.LastUpdateTime.Add(a.PruningGracePeriod))
}

// MarkNotAutoscaled registers that this container state is not controlled by
Expand Down Expand Up @@ -303,6 +303,14 @@ func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.Con
}
}

func (a *AggregateContainerState) UpdatePruningGracePeriod(containerPruningGracePeriod *metav1.Duration) {
if containerPruningGracePeriod != nil {
a.PruningGracePeriod = containerPruningGracePeriod.Duration
} else {
a.PruningGracePeriod = *globalPruningGracePeriodDuration
}
}

// AggregateStateByContainerName takes a set of AggregateContainerStates and merge them
// grouping by the container name. The result is a map from the container name to the aggregation
// from all input containers with the given name.
Expand Down
24 changes: 17 additions & 7 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,16 +277,26 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto
}

vpa, vpaExists := cluster.Vpas[vpaID]
if vpaExists && (vpa.PodSelector.String() != selector.String()) {
// Pod selector was changed. Delete the VPA object and recreate
// it with the new selector.
if err := cluster.DeleteVpa(vpaID); err != nil {
return err
if vpaExists {
if vpa.PodSelector.String() != selector.String() {
// Pod selector was changed. Delete the VPA object and recreate
// it with the new selector.
if err := cluster.DeleteVpa(vpaID); err != nil {
return err
}

vpaExists = false
} else {
// Update the pruningGracePeriod to ensure a potential new grace period is applied.
// This prevents an old, excessively long grace period from persisting and
// potentially causing the VPA to keep stale aggregates with an outdated grace period.
for key, containerState := range vpa.aggregateContainerStates {
containerState.UpdatePruningGracePeriod(vpa_utils.GetContainerPruningGracePeriod(key.ContainerName(), apiObject.Spec.ResourcePolicy))
}
}
vpaExists = false
}
if !vpaExists {
vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time)
vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time)
cluster.Vpas[vpaID] = vpa
for aggregationKey, aggregation := range cluster.aggregateStateMap {
vpa.UseAggregationIfMatching(aggregationKey, aggregation)
Expand Down
5 changes: 2 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Vpa struct {

// NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the
// links to the matched aggregations.
func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa {
func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
vpa := &Vpa{
ID: id,
PodSelector: selector,
Expand All @@ -128,7 +128,6 @@ func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVers
Created: created,
Annotations: make(vpaAnnotationsMap),
Conditions: make(vpaConditionsMap),
TargetRef: targetRef,
// APIVersion defaults to the version of the client used to read resources.
// If a new version is introduced that needs to be differentiated beyond the
// client conversion, this needs to be done based on the resource content.
Expand Down Expand Up @@ -161,7 +160,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre
vpa.aggregateContainerStates[aggregationKey] = aggregation
aggregation.IsUnderVPA = true
aggregation.UpdateMode = vpa.UpdateMode
aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef)
aggregation.UpdatePruningGracePeriod(vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy))
aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy))
}
}
Expand Down
18 changes: 6 additions & 12 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -31,20 +30,15 @@ import (
)

var (
anyTime = time.Unix(0, 0)
regularTargetRef = &autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "test-deployment",
APIVersion: "apps/v1",
}
anyTime = time.Unix(0, 0)
// TODO(maxcao13): write tests for new container policy field
)

func TestMergeAggregateContainerState(t *testing.T) {

containersInitialAggregateState := ContainerNameToAggregateStateMap{}
containersInitialAggregateState["test"] = NewAggregateContainerState()
vpa := NewVpa(VpaID{}, nil, nil, anyTime)
vpa := NewVpa(VpaID{}, nil, anyTime)
vpa.ContainersInitialAggregateState = containersInitialAggregateState

containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{}
Expand Down Expand Up @@ -126,7 +120,7 @@ func TestUpdateConditions(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
containerName := "container"
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0))
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0))
if tc.hasRecommendation {
vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get()
}
Expand Down Expand Up @@ -196,7 +190,7 @@ func TestUpdateRecommendation(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
namespace := "test-namespace"
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime)
for container, rec := range tc.containers {
state := &AggregateContainerState{}
if rec != nil {
Expand Down Expand Up @@ -362,7 +356,7 @@ func TestUseAggregationIfMatching(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime)
vpa.UpdateMode = tc.updateMode
key := mockAggregateStateKey{
namespace: namespace,
Expand Down Expand Up @@ -549,7 +543,7 @@ func TestSetResourcePolicy(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime)
for _, container := range tc.containers {
containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String())
vpa.aggregateContainerStates[containerKey] = aggregation
Expand Down
16 changes: 4 additions & 12 deletions vertical-pod-autoscaler/pkg/utils/vpa/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
core "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -220,19 +219,12 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR
}

// GetContainerPruningGracePeriod returns the pruning grace period for a container.
func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy, targetRef *autoscaling.CrossVersionObjectReference) meta.Duration {
func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy) (gracePeriod *meta.Duration) {
containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy)
if containerPolicy == nil || containerPolicy.PruningGracePeriod == nil {
defaultGracePeriod := meta.Duration{Duration: time.Duration(0)}
if targetRef != nil && targetRef.Kind == "CronJob" {
// CronJob is a special case, because they create containers they are usually supposed to be deleted after the job is done.
// So we set a higher default grace period so that future recommendations for the same workload are not pruned too early.
// TODO(maxcao13): maybe it makes sense to set the default based on the cron schedule?
defaultGracePeriod = meta.Duration{Duration: 24 * time.Hour}
}
return defaultGracePeriod
if containerPolicy != nil {
gracePeriod = containerPolicy.PruningGracePeriod
}
return *containerPolicy.PruningGracePeriod
return
}

// CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object.
Expand Down

0 comments on commit 098c138

Please sign in to comment.