diff --git a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml index 3811ca72bf1..ad2b9de3e00 100644 --- a/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml +++ b/vertical-pod-autoscaler/deploy/vpa-v1-crd-gen.yaml @@ -346,6 +346,19 @@ spec: - Auto - "Off" type: string + pruningGracePeriod: + description: PruningGracePeriod is the duration to wait + before pruning recommendations for containers that no + longer exist. This is useful for containers created by + Jobs from CronJobs, which are frequently created and deleted. + By setting a grace period, recommendations for these containers + are not pruned immediately after they are removed, providing + recommendations to new containers created by subsequent + Jobs. If not specified, recommendations for non-existent + containers are pruned the next time a recommendation loop + is run. However, if the targetRef points to a CronJob, + the default value is 24 hours. + type: string type: object type: array type: object @@ -363,13 +376,13 @@ spec: grouped by the target resource. properties: apiVersion: - description: API version of the referent + description: apiVersion is the API version of the referent type: string kind: - description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string name: - description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names' + description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string required: - kind @@ -639,13 +652,13 @@ spec: grouped by the target resource. properties: apiVersion: - description: API version of the referent + description: apiVersion is the API version of the referent type: string kind: - description: 'Kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + description: 'kind is the kind of the referent; More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' type: string name: - description: 'Name of the referent; More info: http://kubernetes.io/docs/user-guide/identifiers#names' + description: 'name is the name of the referent; More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' type: string required: - kind diff --git a/vertical-pod-autoscaler/e2e/v1/common.go b/vertical-pod-autoscaler/e2e/v1/common.go index 6ce493c9a8b..a554a2a864e 100644 --- a/vertical-pod-autoscaler/e2e/v1/common.go +++ b/vertical-pod-autoscaler/e2e/v1/common.go @@ -352,6 +352,14 @@ func PatchVpaRecommendation(f *framework.Framework, vpa *vpa_types.VerticalPodAu gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Failed to patch VPA.") } +// PatchDeployment patches a deployment with a given patch. +func PatchDeployment(f *framework.Framework, deployment *appsv1.Deployment, patch *patchRecord) { + patchBytes, err := json.Marshal([]patchRecord{*patch}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + _, err = f.ClientSet.AppsV1().Deployments(f.Namespace.Name).Patch(context.TODO(), deployment.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "unexpected error patching deployment") +} + // AnnotatePod adds annotation for an existing pod. func AnnotatePod(f *framework.Framework, podName, annotationName, annotationValue string) { bytes, err := json.Marshal([]patchRecord{{ diff --git a/vertical-pod-autoscaler/e2e/v1/recommender.go b/vertical-pod-autoscaler/e2e/v1/recommender.go index 8f171f33f5d..55398da3642 100644 --- a/vertical-pod-autoscaler/e2e/v1/recommender.go +++ b/vertical-pod-autoscaler/e2e/v1/recommender.go @@ -411,6 +411,103 @@ var _ = RecommenderE2eDescribe("VPA CRD object", func() { }) }) +const recommendationLoopInterval = 1 * time.Minute + +var _ = RecommenderE2eDescribe("VPA CRD object", func() { + f := framework.NewDefaultFramework("vertical-pod-autoscaling") + f.NamespacePodSecurityEnforceLevel = podsecurity.LevelBaseline + + var vpaClientSet vpa_clientset.Interface + + ginkgo.BeforeEach(func() { + vpaClientSet = getVpaClientSet(f) + }) + + ginkgo.It("only provides recommendation to containers that exist when renaming a container", func() { + ginkgo.By("Setting up a hamster deployment") + d := NewNHamstersDeployment(f, 1 /*number of containers*/) + _ = startDeploymentPods(f, d) + + ginkgo.By("Setting up VPA CRD") + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer("*"). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By("Waiting for recommendation to be filled for the container") + vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0))) + + ginkgo.By("Renaming the container") + newContainerName := "renamed-container" + patchRecord := &patchRecord{ + Op: "replace", + Path: "/spec/template/spec/containers/0/name", + Value: newContainerName, + } + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + PatchDeployment(f, d, patchRecord) + + ginkgo.By("Waiting for recommendation to be filled for the renamed container and only the renamed container") + time.Sleep(recommendationLoopInterval) + vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD) + + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + errMsg := fmt.Sprintf("%s is the only container in the VPA CR. There should not be any recommendations for %s", + newContainerName, + GetHamsterContainerNameByIndex(0)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(newContainerName), errMsg) + }) + + ginkgo.It("only provides recommendation to containers that exist when removing a container", func() { + ginkgo.By("Setting up a hamster deployment") + d := NewNHamstersDeployment(f, 2 /*number of containers*/) + _ = startDeploymentPods(f, d) + + ginkgo.By("Setting up VPA CRD") + vpaCRD := test.VerticalPodAutoscaler(). + WithName("hamster-vpa"). + WithNamespace(f.Namespace.Name). + WithTargetRef(hamsterTargetRef). + WithContainer("*"). + Get() + + InstallVPA(f, vpaCRD) + + ginkgo.By("Waiting for recommendation to be filled for both containers") + vpa, err := WaitForRecommendationPresent(vpaClientSet, vpaCRD) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(2)) + + ginkgo.By("Removing the second container") + patchRecord := &patchRecord{ + Op: "remove", + Path: "/spec/template/spec/containers/1", + } + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + PatchDeployment(f, d, patchRecord) + + ginkgo.By("Waiting for recommendation to be filled for just one container") + time.Sleep(recommendationLoopInterval) + vpa, err = WaitForRecommendationPresent(vpaClientSet, vpaCRD) + + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + errMsg := fmt.Sprintf("%s is now the only container in the VPA CR. There should not be any recommendations for %s", + GetHamsterContainerNameByIndex(0), + GetHamsterContainerNameByIndex(1)) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations).Should(gomega.HaveLen(1), errMsg) + gomega.Expect(vpa.Status.Recommendation.ContainerRecommendations[0].ContainerName).To(gomega.Equal(GetHamsterContainerNameByIndex(0)), errMsg) + }) + +}) + func deleteRecommender(c clientset.Interface) error { namespace := "kube-system" listOptions := metav1.ListOptions{} diff --git a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go index c1a686371e6..68a38f1bc2c 100644 --- a/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go +++ b/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1/types.go @@ -214,6 +214,15 @@ type ContainerResourcePolicy struct { // The default is "RequestsAndLimits". // +optional ControlledValues *ContainerControlledValues `json:"controlledValues,omitempty" protobuf:"bytes,6,rep,name=controlledValues"` + + // PruningGracePeriod is the duration to wait before pruning recommendations for containers that no longer exist. + // This is useful for containers created by Jobs from CronJobs, which are frequently created and deleted. + // By setting a grace period, recommendations for these containers are not pruned immediately + // after they are removed, providing recommendations to new containers created by subsequent Jobs. + // If not specified, recommendations for non-existent containers are pruned the next time a recommendation + // loop is run. However, if the targetRef points to a CronJob, the default value is 24 hours. + // +optional + PruningGracePeriod *metav1.Duration `json:"pruningGracePeriod,omitempty" protobuf:"bytes,4,opt,name=pruningGracePeriod"` } const ( diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 5fef7df5deb..ffcad65d24c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -73,6 +73,12 @@ type ClusterStateFeeder interface { // GarbageCollectCheckpoints removes historical checkpoints that don't have a matching VPA. GarbageCollectCheckpoints() + + // MarkAggregates marks all aggregates in all VPAs as not under VPAs + MarkAggregates() + + // SweepAggregates garbage collects all aggregates in all VPAs aggregate lists that are no longer under VPAs + SweepAggregates() } // ClusterStateFeederFactory makes instances of ClusterStateFeeder. @@ -290,7 +296,7 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { } for _, checkpoint := range checkpointList.Items { vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName} - _, exists := feeder.clusterState.Vpas[vpaID] + vpa, exists := feeder.clusterState.Vpas[vpaID] if !exists { err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) if err == nil { @@ -299,6 +305,20 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints() { klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) } } + // Also clean up a checkpoint if the VPA is still there, but the container is gone. AggregateStateByContainerName + // merges in the initial aggregates so we can use it to check "both lists" (initial, aggregates) at once + // TODO(jkyros): could we also just wait until it got "old" enough, e.g. the checkpoint hasn't + // been updated for an hour, blow it away? Because once we remove it from the aggregate lists, it will stop + // being maintained. + _, aggregateExists := vpa.AggregateStateByContainerName()[checkpoint.Spec.ContainerName] + if !aggregateExists { + err = feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(context.TODO(), checkpoint.Name, metav1.DeleteOptions{}) + if err == nil { + klog.V(3).InfoS("Orphaned VPA checkpoint cleanup - deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) + } else { + klog.ErrorS(err, "Orphaned VPA checkpoint cleanup - error deleting", "checkpoint", klog.KRef(namespace, checkpoint.Name)) + } + } } } } @@ -396,6 +416,59 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) { feeder.clusterState.ObservedVpas = vpaCRDs } +// MarkAggregates marks all aggregates IsUnderVPA=false, so when we go +// through LoadPods(), the valid ones will get marked back to true, and +// we can garbage collect the false ones from the VPAs' aggregate lists. +func (feeder *clusterStateFeeder) MarkAggregates() { + for _, vpa := range feeder.clusterState.Vpas { + for _, container := range vpa.AggregateContainerStates() { + container.IsUnderVPA = false + } + for _, container := range vpa.ContainersInitialAggregateState { + container.IsUnderVPA = false + } + } +} + +// SweepAggregates garbage collects all aggregates/initial aggregates from the VPA where the +// aggregate's container no longer exists. +func (feeder *clusterStateFeeder) SweepAggregates() { + + var aggregatesPruned int + var initialAggregatesPruned int + + // TODO(jkyros): This only removes the container state from the VPA's aggregate states, there + // is still a reference to them in feeder.clusterState.aggregateStateMap, and those get + // garbage collected eventually by the rate limited aggregate garbage collector later. + // Maybe we should clean those up here too since we know which ones are stale? + now := time.Now() + for _, vpa := range feeder.clusterState.Vpas { + for containerKey, container := range vpa.AggregateContainerStates() { + if !container.IsUnderVPA && container.IsAggregateStale(now) { + klog.V(4).InfoS("Deleting Aggregate for VPA: container no longer present", + "namespace", vpa.ID.Namespace, + "vpaName", vpa.ID.VpaName, + "containerName", containerKey.ContainerName()) + vpa.DeleteAggregation(containerKey) + aggregatesPruned = aggregatesPruned + 1 + } + } + for containerKey, container := range vpa.ContainersInitialAggregateState { + if !container.IsUnderVPA && container.IsAggregateStale(now) { + klog.V(4).InfoS("Deleting Initial Aggregate for VPA: container no longer present", + "namespace", vpa.ID.Namespace, + "vpaName", vpa.ID.VpaName, + "containerName", containerKey) + delete(vpa.ContainersInitialAggregateState, containerKey) + initialAggregatesPruned = initialAggregatesPruned + 1 + } + } + } + if initialAggregatesPruned > 0 || aggregatesPruned > 0 { + klog.InfoS("Pruned aggregate and initial aggregate containers", "aggregatesPruned", aggregatesPruned, "initialAggregatesPruned", initialAggregatesPruned) + } +} + // LoadPods loads pod into the cluster state. func (feeder *clusterStateFeeder) LoadPods() { podSpecs, err := feeder.specClient.GetPodSpecs() diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go index 8aaf9cbd5e9..5d553466dc1 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender.go @@ -39,7 +39,7 @@ var ( // PodResourceRecommender computes resource recommendation for a Vpa object. type PodResourceRecommender interface { - GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources + GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources } // RecommendedPodResources is a Map from container name to recommended resources. @@ -65,13 +65,13 @@ type podResourceRecommender struct { upperBoundMemory MemoryEstimator } -func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap) RecommendedPodResources { +func (r *podResourceRecommender) GetRecommendedPodResources(containerNameToAggregateStateMap model.ContainerNameToAggregateStateMap, containersPerPod int) RecommendedPodResources { var recommendation = make(RecommendedPodResources) if len(containerNameToAggregateStateMap) == 0 { return recommendation } - fraction := 1.0 / float64(len(containerNameToAggregateStateMap)) + fraction := 1.0 / float64(containersPerPod) minCPU := model.ScaleResource(model.CPUAmountFromCores(*podMinCPUMillicores*0.001), fraction) minMemory := model.ScaleResource(model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), fraction) diff --git a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go index 5824745a461..54f70af7bb8 100644 --- a/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/logic/recommender_test.go @@ -40,7 +40,7 @@ func TestMinResourcesApplied(t *testing.T) { "container-1": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores(*podMinCPUMillicores/1000), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes(*podMinMemoryMb*1024*1024), recommendedResources["container-1"].Target[model.ResourceMemory]) } @@ -63,7 +63,7 @@ func TestMinResourcesSplitAcrossContainers(t *testing.T) { "container-2": &model.AggregateContainerState{}, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-1"].Target[model.ResourceCPU]) assert.Equal(t, model.CPUAmountFromCores((*podMinCPUMillicores/1000)/2), recommendedResources["container-2"].Target[model.ResourceCPU]) assert.Equal(t, model.MemoryAmountFromBytes((*podMinMemoryMb*1024*1024)/2), recommendedResources["container-1"].Target[model.ResourceMemory]) @@ -90,7 +90,7 @@ func TestControlledResourcesFiltered(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) @@ -119,7 +119,7 @@ func TestControlledResourcesFilteredDefault(t *testing.T) { }, } - recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap) + recommendedResources := recommender.GetRecommendedPodResources(containerNameToAggregateStateMap, len(containerNameToAggregateStateMap)) assert.Contains(t, recommendedResources[containerName].Target, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].LowerBound, model.ResourceMemory) assert.Contains(t, recommendedResources[containerName].UpperBound, model.ResourceMemory) diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index 8ee9cc8637d..76014778abb 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -36,6 +36,7 @@ limitations under the License. package model import ( + "flag" "fmt" "math" "time" @@ -46,6 +47,10 @@ import ( "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util" ) +var ( + globalPruningGracePeriodDuration = flag.Duration("pruning-grace-period-duration", 24*365*100*time.Hour, `The grace period for deleting stale aggregates and recommendations. By default, set to 100 years, effectively disabling it.`) +) + // ContainerNameToAggregateStateMap maps a container name to AggregateContainerState // that aggregates state of containers with that name. type ContainerNameToAggregateStateMap map[string]*AggregateContainerState @@ -110,6 +115,8 @@ type AggregateContainerState struct { UpdateMode *vpa_types.UpdateMode ScalingMode *vpa_types.ContainerScalingMode ControlledResources *[]ResourceName + LastUpdateTime time.Time + PruningGracePeriod time.Duration } // GetLastRecommendation returns last recorded recommendation. @@ -143,6 +150,11 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName { return DefaultControlledResources } +// IsAggregateStale returns true if the last update time is past its grace period and the aggregate should be pruned. +func (a *AggregateContainerState) IsAggregateStale(now time.Time) bool { + return now.After(a.LastUpdateTime.Add(a.PruningGracePeriod)) +} + // MarkNotAutoscaled registers that this container state is not controlled by // a VPA object. func (a *AggregateContainerState) MarkNotAutoscaled() { @@ -171,10 +183,12 @@ func (a *AggregateContainerState) MergeContainerState(other *AggregateContainerS // NewAggregateContainerState returns a new, empty AggregateContainerState. func NewAggregateContainerState() *AggregateContainerState { config := GetAggregationsConfig() + now := time.Now() return &AggregateContainerState{ AggregateCPUUsage: util.NewDecayingHistogram(config.CPUHistogramOptions, config.CPUHistogramDecayHalfLife), AggregateMemoryPeaks: util.NewDecayingHistogram(config.MemoryHistogramOptions, config.MemoryHistogramDecayHalfLife), - CreationTime: time.Now(), + CreationTime: now, + LastUpdateTime: now, } } @@ -289,6 +303,15 @@ func (a *AggregateContainerState) UpdateFromPolicy(resourcePolicy *vpa_types.Con } } +// UpdatePruningGracePeriod updates the an aggregate state with a containerPruningGracePeriod or the global pruning duration if nil. +func (a *AggregateContainerState) UpdatePruningGracePeriod(containerPruningGracePeriod *metav1.Duration) { + if containerPruningGracePeriod != nil { + a.PruningGracePeriod = containerPruningGracePeriod.Duration + } else { + a.PruningGracePeriod = *globalPruningGracePeriodDuration + } +} + // AggregateStateByContainerName takes a set of AggregateContainerStates and merge them // grouping by the container name. The result is a map from the container name to the aggregation // from all input containers with the given name. diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index 8fd2fd8f030..4ea8fde839c 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -130,29 +130,43 @@ func (cluster *ClusterState) AddOrUpdatePod(podID PodID, newLabels labels.Set, p } newlabelSetKey := cluster.getLabelSetKey(newLabels) - if podExists && pod.labelSetKey != newlabelSetKey { - // This Pod is already counted in the old VPA, remove the link. - cluster.removePodFromItsVpa(pod) - } - if !podExists || pod.labelSetKey != newlabelSetKey { + if pod.labelSetKey != newlabelSetKey { + if podExists { + // This Pod is already counted in the old VPA, remove the link. + cluster.removePodFromItsVpa(pod) + } pod.labelSetKey = newlabelSetKey // Set the links between the containers and aggregations based on the current pod labels. for containerName, container := range pod.Containers { containerID := ContainerID{PodID: podID, ContainerName: containerName} container.aggregator = cluster.findOrCreateAggregateContainerState(containerID) } - - cluster.addPodToItsVpa(pod) + cluster.setVPAContainersPerPod(pod, true) + } else if !podExists { + cluster.setVPAContainersPerPod(pod, true) + } else if len(pod.Containers) > 1 { + // Tally the number of containers for later when we're averaging the recommendations if there's more than one container + cluster.setVPAContainersPerPod(pod, false) } pod.Phase = phase } -// addPodToItsVpa increases the count of Pods associated with a VPA object. -// Does a scan similar to findOrCreateAggregateContainerState so could be optimized if needed. -func (cluster *ClusterState) addPodToItsVpa(pod *PodState) { +// setVPAContainersPerPod sets the number of containers per pod seen for pods connected to this VPA +// so that later when we're splitting the minimum recommendations over containers, we're splitting them over +// the correct number and not just the number of aggregates that have *ever* been present. (We don't want minimum resources +// to erroneously shrink, either) +// If addPodToItsVpa is true, it also increments the pod count for the VPA. +func (cluster *ClusterState) setVPAContainersPerPod(pod *PodState, addPodToItsVpa bool) { for _, vpa := range cluster.Vpas { if vpa_utils.PodLabelsMatchVPA(pod.ID.Namespace, cluster.labelSetMap[pod.labelSetKey], vpa.ID.Namespace, vpa.PodSelector) { - vpa.PodCount++ + // We want the "high water mark" of the most containers in the pod in the event + // that we're rolling out a pod that has an additional container + if len(pod.Containers) > vpa.ContainersPerPod { + vpa.ContainersPerPod = len(pod.Containers) + } + if addPodToItsVpa { + vpa.PodCount++ + } } } } @@ -197,12 +211,16 @@ func (cluster *ClusterState) AddOrUpdateContainer(containerID ContainerID, reque if !podExists { return NewKeyError(containerID.PodID) } + aggregateState := cluster.findOrCreateAggregateContainerState(containerID) if container, containerExists := pod.Containers[containerID.ContainerName]; !containerExists { - cluster.findOrCreateAggregateContainerState(containerID) pod.Containers[containerID.ContainerName] = NewContainerState(request, NewContainerStateAggregatorProxy(cluster, containerID)) } else { // Container aleady exists. Possibly update the request. container.Request = request + // Mark this container as still managed so the aggregates don't get garbage collected + aggregateState.IsUnderVPA = true + // Update the last update time so we potentially don't garbage collect it too soon + aggregateState.LastUpdateTime = time.Now() } return nil } @@ -259,13 +277,23 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa, vpaExists := cluster.Vpas[vpaID] - if vpaExists && (vpa.PodSelector.String() != selector.String()) { - // Pod selector was changed. Delete the VPA object and recreate - // it with the new selector. - if err := cluster.DeleteVpa(vpaID); err != nil { - return err + if vpaExists { + if vpa.PodSelector.String() != selector.String() { + // Pod selector was changed. Delete the VPA object and recreate + // it with the new selector. + if err := cluster.DeleteVpa(vpaID); err != nil { + return err + } + + vpaExists = false + } else { + // Update the pruningGracePeriod to ensure a potential new grace period is applied. + // This prevents an old, excessively long grace period from persisting and + // potentially causing the VPA to keep stale aggregates with an outdated grace period. + for key, containerState := range vpa.aggregateContainerStates { + containerState.UpdatePruningGracePeriod(vpa_utils.GetContainerPruningGracePeriod(key.ContainerName(), apiObject.Spec.ResourcePolicy)) + } } - vpaExists = false } if !vpaExists { vpa = NewVpa(vpaID, selector, apiObject.CreationTimestamp.Time) @@ -275,6 +303,11 @@ func (cluster *ClusterState) AddOrUpdateVpa(apiObject *vpa_types.VerticalPodAuto } vpa.PodCount = len(cluster.GetMatchingPods(vpa)) } + + // Default this to the minimum, we will tally the true number when we load the pods later + // TODO(jkyros): This is gross, it depends on the order I know it currently loads things in, but + // that might not be the case someday + vpa.ContainersPerPod = 1 vpa.TargetRef = apiObject.Spec.TargetRef vpa.Annotations = annotationsMap vpa.Conditions = conditionsMap diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index b4f32e07f8c..fc3d54c0350 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -110,6 +110,11 @@ type Vpa struct { TargetRef *autoscaling.CrossVersionObjectReference // PodCount contains number of live Pods matching a given VPA object. PodCount int + // ContainersPerPod contains the "high water mark" of the number of containers + // per pod to average the recommendation across. Used to make sure we aren't + // "fractionalizing" minResources erroneously during a redeploy when when a pod's + // container is removed or renamed + ContainersPerPod int } // NewVpa returns a new Vpa with a given ID and pod selector. Doesn't set the @@ -155,6 +160,7 @@ func (vpa *Vpa) UseAggregationIfMatching(aggregationKey AggregateStateKey, aggre vpa.aggregateContainerStates[aggregationKey] = aggregation aggregation.IsUnderVPA = true aggregation.UpdateMode = vpa.UpdateMode + aggregation.UpdatePruningGracePeriod(vpa_api_util.GetContainerPruningGracePeriod(aggregationKey.ContainerName(), vpa.ResourcePolicy)) aggregation.UpdateFromPolicy(vpa_api_util.GetContainerResourcePolicy(aggregationKey.ContainerName(), vpa.ResourcePolicy)) } } @@ -209,6 +215,11 @@ func (vpa *Vpa) AggregateStateByContainerName() ContainerNameToAggregateStateMap return containerNameToAggregateStateMap } +// AggregateContainerStates returns the underlying internal aggregate state map. +func (vpa *Vpa) AggregateContainerStates() aggregateContainerStatesMap { + return vpa.aggregateContainerStates +} + // HasRecommendation returns if the VPA object contains any recommendation func (vpa *Vpa) HasRecommendation() bool { return (vpa.Recommendation != nil) && len(vpa.Recommendation.ContainerRecommendations) > 0 diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go index cdc8d0fee1a..11e690b5ab2 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go @@ -31,6 +31,7 @@ import ( var ( anyTime = time.Unix(0, 0) + // TODO(maxcao13): write tests for new container policy field ) func TestMergeAggregateContainerState(t *testing.T) { diff --git a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go index 1f456a80c18..573602b33fb 100644 --- a/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go +++ b/vertical-pod-autoscaler/pkg/recommender/routines/recommender.go @@ -91,7 +91,7 @@ func (r *recommender) UpdateVPAs() { if !found { continue } - resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa)) + resources := r.podResourceRecommender.GetRecommendedPodResources(GetContainerNameToAggregateStateMap(vpa), vpa.ContainersPerPod) had := vpa.HasRecommendation() listOfResourceRecommendation := logic.MapToListOfRecommendedContainerResources(resources) @@ -150,9 +150,15 @@ func (r *recommender) RunOnce() { r.clusterStateFeeder.LoadVPAs(ctx) timer.ObserveStep("LoadVPAs") + r.clusterStateFeeder.MarkAggregates() + timer.ObserveStep("MarkAggregates") + r.clusterStateFeeder.LoadPods() timer.ObserveStep("LoadPods") + r.clusterStateFeeder.SweepAggregates() + timer.ObserveStep("SweepAggregates") + r.clusterStateFeeder.LoadRealTimeMetrics() timer.ObserveStep("LoadMetrics") klog.V(3).InfoS("ClusterState is tracking", "pods", len(r.clusterState.Pods), "vpas", len(r.clusterState.Vpas)) diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 8fbf11c9835..ebd2afc566c 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -218,6 +218,15 @@ func GetContainerControlledValues(name string, vpaResourcePolicy *vpa_types.PodR return *containerPolicy.ControlledValues } +// GetContainerPruningGracePeriod returns the pruning grace period for a container. +func GetContainerPruningGracePeriod(containerName string, vpaResourcePolicy *vpa_types.PodResourcePolicy) (gracePeriod *meta.Duration) { + containerPolicy := GetContainerResourcePolicy(containerName, vpaResourcePolicy) + if containerPolicy != nil { + gracePeriod = containerPolicy.PruningGracePeriod + } + return +} + // CreateOrUpdateVpaCheckpoint updates the status field of the VPA Checkpoint API object. // If object doesn't exits it is created. func CreateOrUpdateVpaCheckpoint(vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointInterface,