Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VPA: prune stale container aggregates, split recommendations over true number of containers #6745

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
25 changes: 19 additions & 6 deletions vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@ spec:
- Auto
- "Off"
type: string
pruningGracePeriod:
description: PruningGracePeriod is the duration to wait
before pruning recommendations for containers that no
longer exist. This is useful for containers created by
Jobs from CronJobs, which are frequently created and deleted.
By setting a grace period, recommendations for these containers
are not pruned immediately after they are removed, providing
recommendations to new containers created by subsequent
Jobs. If not specified, recommendations for non-existent
containers are pruned the next time a recommendation loop
is run. However, if the targetRef points to a CronJob,
the default value is 24 hours.
type: string
type: object
type: array
type: object
Expand All @@ -363,13 +376,13 @@ spec:
grouped by the target resource.
properties:
apiVersion:
description: API version of the referent
description: apiVersion is the API version of the referent
type: string
kind:
description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names'
description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
required:
- kind
Expand Down Expand Up @@ -639,13 +652,13 @@ spec:
grouped by the target resource.
properties:
apiVersion:
description: API version of the referent
description: apiVersion is the API version of the referent
type: string
kind:
description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names'
description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
required:
- kind
Expand Down
8 changes: 8 additions & 0 deletions vertical-pod-autoscaler/e2e/v1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ func PatchVpaRecommendation(f *framework.Framework, vpa *vpa_types.VerticalPodAu
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to patch VPA.")
}

// PatchDeployment patches a deployment with a given patch.
func PatchDeployment(f *framework.Framework, deployment *appsv1.Deployment, patch *patchRecord) {
patchBytes, err := json.Marshal([]patchRecord{*patch})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
_, err = f.ClientSet.AppsV1().Deployments(f.Namespace.Name).Patch(context.TODO(), deployment.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error patching deployment")
}

// AnnotatePod adds annotation for an existing pod.
func AnnotatePod(f *framework.Framework, podName, annotationName, annotationValue string) {
bytes, err := json.Marshal([]patchRecord{{
Expand Down
97 changes: 97 additions & 0 deletions vertical-pod-autoscaler/e2e/v1/recommender.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,103 @@ var _ = RecommenderE2eDescribe("VPA CRD object", func() {
})
})

const recommendationLoopInterval = 1 * time.Minute

var _ = RecommenderE2eDescribe("VPA CRD object", func() {
f := framework.NewDefaultFramework("vertical-pod-autoscaling")
f.NamespacePodSecurityEnforceLevel = podsecurity.LevelBaseline

var vpaClientSet vpa_clientset.Interface

ginkgo.BeforeEach(func() {
vpaClientSet = getVpaClientSet(f)
})

ginkgo.It("only provides recommendation to containers that exist when renaming a container", func() {
ginkgo.By("Setting up a hamster deployment")
d := NewNHamstersDeployment(f, 1 /*number of containers*/)
_ = startDeploymentPods(f, d)

ginkgo.By("Setting up VPA CRD")
vpaCRD := test.VerticalPodAutoscaler().
WithName("hamster-vpa").
WithNamespace(f.Namespace.Name).
WithTargetRef(hamsterTargetRef).
WithContainer("*").
Get()

InstallVPA(f, vpaCRD)

ginkgo.By("Waiting for recommendation to be filled for the container")
vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1))
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0)))

ginkgo.By("Renaming the container")
newContainerName := "renamed-container"
patchRecord := &patchRecord{
Op: "replace",
Path: "/spec/template/spec/containers/0/name",
Value: newContainerName,
}
gomega.Expect(err).NotTo(gomega.HaveOccurred())
PatchDeployment(f, d, patchRecord)

ginkgo.By("Waiting for recommendation to be filled for the renamed container and only the renamed container")
time.Sleep(recommendationLoopInterval)
vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD)

gomega.Expect(err).NotTo(gomega.HaveOccurred())
errMsg := fmt.Sprintf("%s is the only container in the VPA CR. There should not be any recommendations for %s",
newContainerName,
GetHamsterContainerNameByIndex(0))
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg)
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(newContainerName), errMsg)
})

ginkgo.It("only provides recommendation to containers that exist when removing a container", func() {
ginkgo.By("Setting up a hamster deployment")
d := NewNHamstersDeployment(f, 2 /*number of containers*/)
_ = startDeploymentPods(f, d)

ginkgo.By("Setting up VPA CRD")
vpaCRD := test.VerticalPodAutoscaler().
WithName("hamster-vpa").
WithNamespace(f.Namespace.Name).
WithTargetRef(hamsterTargetRef).
WithContainer("*").
Get()

InstallVPA(f, vpaCRD)

ginkgo.By("Waiting for recommendation to be filled for both containers")
vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(2))

ginkgo.By("Removing the second container")
patchRecord := &patchRecord{
Op: "remove",
Path: "/spec/template/spec/containers/1",
}
gomega.Expect(err).NotTo(gomega.HaveOccurred())
PatchDeployment(f, d, patchRecord)

ginkgo.By("Waiting for recommendation to be filled for just one container")
time.Sleep(recommendationLoopInterval)
vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD)

gomega.Expect(err).NotTo(gomega.HaveOccurred())
errMsg := fmt.Sprintf("%s is now the only container in the VPA CR. There should not be any recommendations for %s",
GetHamsterContainerNameByIndex(0),
GetHamsterContainerNameByIndex(1))
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg)
gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0)), errMsg)
})

})

func deleteRecommender(c clientset.Interface) error {
namespace := "kube-system"
listOptions := metav1.ListOptions{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,15 @@ type ContainerResourcePolicy struct {
// The default is "RequestsAndLimits".
// +optional
ControlledValues *ContainerControlledValues `json:"controlledValues,omitempty" protobuf:"bytes,6,rep,name=controlledValues"`

// PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist.
// This is useful for containers created by Jobs from CronJobs, which are frequently created and deleted.
// By setting a grace period, recommendations for these containers are not pruned immediately
// after they are removed, providing recommendations to new containers created by subsequent Jobs.
// If not specified, recommendations for non-existent containers are pruned the next time a recommendation
// loop is run. However, if the targetRef points to a CronJob, the default value is 24 hours.
// +optional
PruningGracePeriod *metav1.Duration `json:"pruningGracePeriod,omitempty" protobuf:"bytes,4,opt,name=pruningGracePeriod"`
}

const (
Expand Down
75 changes: 74 additions & 1 deletion vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ClusterStateFeeder interface {

// GarbageCollectCheckpoints removes historical checkpoints that don't have a matching VPA.
GarbageCollectCheckpoints()

// MarkAggregates marks all aggregates in all VPAs as not under VPAs
MarkAggregates()

// SweepAggregates garbage collects all aggregates in all VPAs aggregate lists that are no longer under VPAs
SweepAggregates()
}

// ClusterStateFeederFactory makes instances of ClusterStateFeeder.
Expand Down Expand Up @@ -290,7 +296,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
}
for _, checkpoint := range checkpointList.Items {
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
_, exists := feeder.clusterState.Vpas[vpaID]
vpa, exists := feeder.clusterState.Vpas[vpaID]
if !exists {
err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{})
if err == nil {
Expand All @@ -299,6 +305,20 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() {
klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name))
}
}
// Also clean up a checkpoint if the VPA is still there, but the container is gone. AggregateStateByContainerName
// merges in the initial aggregates so we can use it to check "both lists" (initial, aggregates) at once
// TODO(jkyros): could we also just wait until it got "old" enough, e.g. the checkpoint hasn't
// been updated for an hour, blow it away? Because once we remove it from the aggregate lists, it will stop
// being maintained.
_, aggregateExists := vpa.AggregateStateByContainerName()[checkpoint.Spec.ContainerName]
if !aggregateExists {
err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{})
if err == nil {
klog.V(3).InfoS("Orphaned VPA checkpoint cleanup - deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name))
} else {
klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name))
}
}
}
}
}
Expand Down Expand Up @@ -396,6 +416,59 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
feeder.clusterState.ObservedVpas = vpaCRDs
}

// MarkAggregates marks all aggregates IsUnderVPA=false, so when we go
// through LoadPods(), the valid ones will get marked back to true, and
// we can garbage collect the false ones from the VPAs' aggregate lists.
func (feeder *clusterStateFeeder) MarkAggregates() {
for _, vpa := range feeder.clusterState.Vpas {
for _, container := range vpa.AggregateContainerStates() {
container.IsUnderVPA = false
}
for _, container := range vpa.ContainersInitialAggregateState {
container.IsUnderVPA = false
}
}
}

// SweepAggregates garbage collects all aggregates/initial aggregates from the VPA where the
// aggregate's container no longer exists.
func (feeder *clusterStateFeeder) SweepAggregates() {

var aggregatesPruned int
var initialAggregatesPruned int

// TODO(jkyros): This only removes the container state from the VPA's aggregate states, there
// is still a reference to them in feeder.clusterState.aggregateStateMap, and those get
// garbage collected eventually by the rate limited aggregate garbage collector later.
// Maybe we should clean those up here too since we know which ones are stale?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a lot of extra work to do that? Do you see any risks doing it here?

Copy link
Author

@jkyros jkyros May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think it's a lot of extra work, it should be reasonably cheap to clean them up here since it's just deletions from the other maps if the keys exist, I just didn't know all the history.

It seemed possible at least that we were intentionally waiting to clean up the aggregates so if there was an unexpected hiccup we didn't just immediately blow away all that aggregate history we worked so hard to get? (Like maybe someone oopses, deletes their deployment, then puts it back? Right now we don't have to start over -- the pods come back in, find their container aggregates, and resume ? But if I clean them up here, we have to start over...)

now := time.Now()
for _, vpa := range feeder.clusterState.Vpas {
for containerKey, container := range vpa.AggregateContainerStates() {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
"containerName", containerKey.ContainerName())
vpa.DeleteAggregation(containerKey)
aggregatesPruned = aggregatesPruned + 1
}
}
for containerKey, container := range vpa.ContainersInitialAggregateState {
if !container.IsUnderVPA && container.IsAggregateStale(now) {
klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present",
"namespace", vpa.ID.Namespace,
"vpaName", vpa.ID.VpaName,
"containerName", containerKey)
delete(vpa.ContainersInitialAggregateState, containerKey)
initialAggregatesPruned = initialAggregatesPruned + 1
}
}
}
if initialAggregatesPruned > 0 || aggregatesPruned > 0 {
klog.InfoS("Pruned aggregate and initial aggregate containers", "aggregatesPruned", aggregatesPruned, "initialAggregatesPruned", initialAggregatesPruned)
}
}

// LoadPods loads pod into the cluster state.
func (feeder *clusterStateFeeder) LoadPods() {
podSpecs, err := feeder.specClient.GetPodSpecs()
Expand Down
6 changes: 3 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/logic/recommender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (

// PodResourceRecommender computes resource recommendation for a Vpa object.
type PodResourceRecommender interface {
GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources
GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources
}

// RecommendedPodResources is a Map from container name to recommended resources.
Expand All @@ -65,13 +65,13 @@ type podResourceRecommender struct {
upperBoundMemory MemoryEstimator
}

func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources {
func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources {
var recommendation = make(RecommendedPodResources)
if len(containerNameToAggregateStateMap) == 0 {
return recommendation
}

fraction := 1.0 / float64(len(containerNameToAggregateStateMap))
fraction := 1.0 / float64(containersPerPod)
minCPU := model.ScaleResource(model.CPUAmountFromCores(*podMinCPUMillicores*0.001), fraction)
minMemory := model.ScaleResource(model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), fraction)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMinResourcesApplied(t *testing.T) {
"container-1": &model.AggregateContainerState{},
}

recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap)
recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap))
assert.Equal(t, model.CPUAmountFromCores(*podMinCPUMillicores/1000), recommendedResources["container-1"].Target[model.ResourceCPU])
assert.Equal(t, model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), recommendedResources["container-1"].Target[model.ResourceMemory])
}
Expand All @@ -63,7 +63,7 @@ func TestMinResourcesSplitAcrossContainers(t *testing.T) {
"container-2": &model.AggregateContainerState{},
}

recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap)
recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap))
assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-1"].Target[model.ResourceCPU])
assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-2"].Target[model.ResourceCPU])
assert.Equal(t, model.MemoryAmountFromBytes((*podMinMemoryMb*1024*1024)/2), recommendedResources["container-1"].Target[model.ResourceMemory])
Expand All @@ -90,7 +90,7 @@ func TestControlledResourcesFiltered(t *testing.T) {
},
}

recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap)
recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap))
assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory)
assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory)
assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory)
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestControlledResourcesFilteredDefault(t *testing.T) {
},
}

recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap)
recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap))
assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory)
assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory)
assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory)
Expand Down
Loading
Loading