Skip to content

Commit

Permalink
Introduce pruningGracePeriod which allows a grace period before the v…
Browse files Browse the repository at this point in the history
…pa prunes recommendations from non-existente containers

Signed-off-by: Max Cao <[email protected]>
  • Loading branch information
maxcao13 committed Dec 12, 2024
1 parent 9d94f62 commit b6e435b
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 18 deletions.
25 changes: 19 additions & 6 deletions vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@ spec:
- Auto
- "Off"
type: string
pruningGracePeriod:
description: PruningGracePeriod is the duration to wait
before pruning recommendations for containers that no
longer exist. This is useful for containers created by
Jobs from CronJobs, which are frequently created and deleted.
By setting a grace period, recommendations for these containers
are not pruned immediately after they are removed, providing
recommendations to new containers created by subsequent
Jobs. If not specified, recommendations for non-existent
containers are pruned the next time a recommendation loop
is run. However, if the targetRef points to a CronJob,
the default value is 24 hours.
type: string
type: object
type: array
type: object
Expand All @@ -363,13 +376,13 @@ spec:
grouped by the target resource.
properties:
apiVersion:
description: API version of the referent
description: apiVersion is the API version of the referent
type: string
kind:
description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names'
description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
required:
- kind
Expand Down Expand Up @@ -639,13 +652,13 @@ spec:
grouped by the target resource.
properties:
apiVersion:
description: API version of the referent
description: apiVersion is the API version of the referent
type: string
kind:
description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names'
description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
required:
- kind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ type ContainerResourcePolicy struct {
// The default is "RequestsAndLimits".
// +optional
ControlledValues *ContainerControlledValues `json:"controlledValues,omitempty" protobuf:"bytes,6,rep,name=controlledValues"`

// PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist.
// This is useful for containers created by Jobs from CronJobs, which are frequently created and deleted.
// By setting a grace period, recommendations for these containers are not pruned immediately
// after they are removed, providing recommendations to new containers created by subsequent Jobs.
// If not specified, recommendations for non-existent containers are pruned the next time a recommendation
// loop is run. However, if the targetRef points to a CronJob, the default value is 24 hours.
// +optional
PruningGracePeriod *metav1.Duration `json:"pruningGracePeriod,omitempty" protobuf:"bytes,4,opt,name=pruningGracePeriod"`
}

const (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,10 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
// is still a reference to them in feeder.clusterState.aggregateStateMap, and those get
// garbage collected eventually by the rate limited aggregate garbage collector later.
// Maybe we should clean those up here too since we know which ones are stale?
now := time.Now()
for _, vpa := range feeder.clusterState.Vpas {

for containerKey, container := range vpa.AggregateContainerStates() {
if !container.IsUnderVPA {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand All @@ -454,7 +454,7 @@ func (feeder *clusterStateFeeder) SweepAggregates() {
}
}
for containerKey, container := range vpa.ContainersInitialAggregateState {
if !container.IsUnderVPA {
if !container.IsUnderVPA && now.After(container.GetLastUpdate().Add(container.GetPruningGracePeriod().Duration)) {
klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ type AggregateContainerState struct {
UpdateMode *vpa_types.UpdateMode
ScalingMode *vpa_types.ContainerScalingMode
ControlledResources *[]ResourceName
LastUpdateTime time.Time
PruningGracePeriod metav1.Duration
}

// GetLastRecommendation returns last recorded recommendation.
Expand Down Expand Up @@ -143,6 +145,16 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName {
return DefaultControlledResources
}

// GetLastUpdate returns the time of the last update of the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetLastUpdate() time.Time {
return a.LastUpdateTime
}

// GetPruningGracePeriod returns the pruning grace period set in the VPA object controlling this aggregator.
func (a *AggregateContainerState) GetPruningGracePeriod() metav1.Duration {
return a.PruningGracePeriod
}

// MarkNotAutoscaled registers that this container state is not controlled by
// a VPA object.
func (a *AggregateContainerState) MarkNotAutoscaled() {
Expand Down Expand Up @@ -171,10 +183,12 @@ func (a *AggregateContainerState) MergeContainerState(other *AggregateContainerS
// NewAggregateContainerState returns a new, empty AggregateContainerState.
func NewAggregateContainerState() *AggregateContainerState {
config := GetAggregationsConfig()
now := time.Now()
return &AggregateContainerState{
AggregateCPUUsage: util.NewDecayingHistogram(config.CPUHistogramOptions, config.CPUHistogramDecayHalfLife),
AggregateMemoryPeaks: util.NewDecayingHistogram(config.MemoryHistogramOptions, config.MemoryHistogramDecayHalfLife),
CreationTime: time.Now(),
CreationTime: now,
LastUpdateTime: now,
}
}

Expand Down
4 changes: 3 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func (cluster *ClusterState) AddOrUpdateContainer(containerID ContainerID, reque
container.Request = request
// Mark this container as still managed so the aggregates don't get garbage collected
aggregateState.IsUnderVPA = true
// Update the last update time so we potentially don't garbage collect it too soon
aggregateState.LastUpdateTime = time.Now()
}
return nil
}
Expand Down Expand Up @@ -284,7 +286,7 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto
vpaExists = false
}
if !vpaExists {
vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time)
vpa = NewVpa(vpaID, selector, apiObject.Spec.TargetRef, apiObject.CreationTimestamp.Time)
cluster.Vpas[vpaID] = vpa
for aggregationKey, aggregation := range cluster.aggregateStateMap {
vpa.UseAggregationIfMatching(aggregationKey, aggregation)
Expand Down
4 changes: 3 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/model/vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type Vpa struct {

// NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the
// links to the matched aggregations.
func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
func NewVpa(id VpaID, selector labels.Selector, targetRef *autoscaling.CrossVersionObjectReference, created time.Time) *Vpa {
vpa := &Vpa{
ID: id,
PodSelector: selector,
Expand All @@ -128,6 +128,7 @@ func NewVpa(id VpaID, selector labels.Selector, created time.Time) *Vpa {
Created: created,
Annotations: make(vpaAnnotationsMap),
Conditions: make(vpaConditionsMap),
TargetRef: targetRef,
// APIVersion defaults to the version of the client used to read resources.
// If a new version is introduced that needs to be differentiated beyond the
// client conversion, this needs to be done based on the resource content.
Expand Down Expand Up @@ -160,6 +161,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre
vpa.aggregateContainerStates[aggregationKey] = aggregation
aggregation.IsUnderVPA = true
aggregation.UpdateMode = vpa.UpdateMode
aggregation.PruningGracePeriod = vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy, vpa.TargetRef)
aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy))
}
}
Expand Down
19 changes: 13 additions & 6 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -30,14 +31,20 @@ import (
)

var (
anyTime = time.Unix(0, 0)
anyTime = time.Unix(0, 0)
regularTargetRef = &autoscaling.CrossVersionObjectReference{
Kind: "Deployment",
Name: "test-deployment",
APIVersion: "apps/v1",
}
// TODO(maxcao13): write tests for new container policy field
)

func TestMergeAggregateContainerState(t *testing.T) {

containersInitialAggregateState := ContainerNameToAggregateStateMap{}
containersInitialAggregateState["test"] = NewAggregateContainerState()
vpa := NewVpa(VpaID{}, nil, anyTime)
vpa := NewVpa(VpaID{}, nil, nil, anyTime)
vpa.ContainersInitialAggregateState = containersInitialAggregateState

containerNameToAggregateStateMap := ContainerNameToAggregateStateMap{}
Expand Down Expand Up @@ -119,7 +126,7 @@ func TestUpdateConditions(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
containerName := "container"
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), time.Unix(0, 0))
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, labels.Nothing(), nil, time.Unix(0, 0))
if tc.hasRecommendation {
vpa.Recommendation = test.Recommendation().WithContainer(containerName).WithTarget("5", "200").Get()
}
Expand Down Expand Up @@ -189,7 +196,7 @@ func TestUpdateRecommendation(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
namespace := "test-namespace"
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, labels.Nothing(), regularTargetRef, anyTime)
for container, rec := range tc.containers {
state := &AggregateContainerState{}
if rec != nil {
Expand Down Expand Up @@ -355,7 +362,7 @@ func TestUseAggregationIfMatching(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, anyTime)
vpa := NewVpa(VpaID{Namespace: namespace, VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
vpa.UpdateMode = tc.updateMode
key := mockAggregateStateKey{
namespace: namespace,
Expand Down Expand Up @@ -542,7 +549,7 @@ func TestSetResourcePolicy(t *testing.T) {
if !assert.NoError(t, err) {
t.FailNow()
}
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, anyTime)
vpa := NewVpa(VpaID{Namespace: "test-namespace", VpaName: "my-favourite-vpa"}, selector, regularTargetRef, anyTime)
for _, container := range tc.containers {
containerKey, aggregation := testAggregation(vpa, container, labels.Set(testLabels).String())
vpa.aggregateContainerStates[containerKey] = aggregation
Expand Down
17 changes: 17 additions & 0 deletions vertical-pod-autoscaler/pkg/utils/vpa/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

autoscaling "k8s.io/api/autoscaling/v1"
core "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -218,6 +219,22 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR
return *containerPolicy.ControlledValues
}

// GetContainerPruningGracePeriod returns the pruning grace period for a container.
func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy, targetRef *autoscaling.CrossVersionObjectReference) meta.Duration {
containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy)
if containerPolicy == nil || containerPolicy.PruningGracePeriod == nil {
defaultGracePeriod := meta.Duration{Duration: time.Duration(0)}
if targetRef != nil && targetRef.Kind == "CronJob" {
// CronJob is a special case, because they create containers they are usually supposed to be deleted after the job is done.
// So we set a higher default grace period so that future recommendations for the same workload are not pruned too early.
// TODO(maxcao13): maybe it makes sense to set the default based on the cron schedule?
defaultGracePeriod = meta.Duration{Duration: 24 * time.Hour}
}
return defaultGracePeriod
}
return *containerPolicy.PruningGracePeriod
}

// CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object.
// If object doesn't exits it is created.
func CreateOrUpdateVpaCheckpoint(vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointInterface,
Expand Down

0 comments on commit b6e435b

Please sign in to comment.