From 4ff68f5d573d666563abd3d4a37dad2839fe3465 Mon Sep 17 00:00:00 2001 From: Mahmoud Atwa Date: Mon, 6 Nov 2023 15:43:26 +0000 Subject: [PATCH] Fix some comments + replace pod-time-buffer-enabled flag with ignore-scheduler-processing --- .../config/autoscaling_options.go | 6 ++- .../podlistprocessor/clear_tpu_request.go | 39 +++++++++++++++++++ .../podlistprocessor/filter_out_expendable.go | 3 +- .../podlistprocessor/pod_list_processor.go | 1 + cluster-autoscaler/core/static_autoscaler.go | 16 +++++--- cluster-autoscaler/main.go | 4 +- .../utils/kubernetes/listers.go | 10 ++--- 7 files changed, 61 insertions(+), 18 deletions(-) create mode 100644 cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index b27a4d32fc34..40441206a0f7 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -277,6 +277,8 @@ type AutoscalingOptions struct { // dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint // based on the latency between the CA and the api-server DynamicNodeDeleteDelayAfterTaintEnabled bool - // Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up. - EnablePodTimeBuffer bool + //IgnoreSchedulerProcessing is used to signal whether CA will/won't wait + //for scheduler to mark pods as unschedulable and will process both marked & non-marked pods + //it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up. + IgnoreSchedulerProcessing bool } diff --git a/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go new file mode 100644 index 000000000000..f26104347eaa --- /dev/null +++ b/cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podlistprocessor + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" +) + +type clearTpuRequests struct { +} + +// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods +func NewClearTPURequestsPodListProcessor() *clearTpuRequests { + return &clearTpuRequests{} +} + +// Process removes pods' tpu requests +func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + return tpu.ClearTPURequests(pods), nil +} + +func (p *clearTpuRequests) CleanUp() { +} diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index c684901c090b..14b17b445831 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -34,7 +34,6 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable { // Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) { - klog.V(4).Infof("Filtering out expendable pods") nodes, err := context.AllNodeLister().List() if err != nil { return nil, err @@ -55,7 +54,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { for _, p := range pods { if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { - klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err) + klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) return caerrors.ToAutoscalerError(caerrors.InternalError, err) } } diff --git a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go index c6948ad7c115..ccbc5dfc8183 100644 --- a/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go +++ b/cluster-autoscaler/core/podlistprocessor/pod_list_processor.go @@ -32,6 +32,7 @@ type defaultPodListProcessor struct { func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor { return &defaultPodListProcessor{ processors: []pods.PodListProcessor{ + NewClearTPURequestsPodListProcessor(), NewFilterOutExpendablePodListProcessor(), NewCurrentlyDrainedNodesPodListProcessor(), NewFilterOutSchedulablePodListProcessor(predicateChecker), diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 8e7d27caf55f..d9a09d34ff54 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -308,7 +308,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr klog.Errorf("Failed to list pods: %v", err) return caerrors.ToAutoscalerError(caerrors.ApiCallError, err) } - originalScheduledPods, unschedulablePods, schedulerUnprocessed := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.SchedulerUnprocessedPods(pods) + originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods) + schedulerUnprocessed := make([]*apiv1.Pod, 0, 0) + if a.IgnoreSchedulerProcessing { + schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods) + } // Update cluster resource usage metrics coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime) @@ -450,10 +454,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr metrics.UpdateLastTime(metrics.Autoscaling, time.Now()) + // SchedulerUnprocessed might be zero here if it was disabled metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed)) - // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods - unschedulablePods = append(unschedulablePods, schedulerUnprocessed...) - + if a.IgnoreSchedulerProcessing { + // Treat unknown pods as unschedulable, pod list processor will remove schedulable pods + unschedulablePods = append(unschedulablePods, schedulerUnprocessed...) + } // Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet. upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes() // For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on @@ -537,7 +543,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal { scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable klog.V(1).Info("Max total nodes in cluster reached") - } else if a.EnablePodTimeBuffer && allPodsAreNew(unschedulablePodsToHelp, currentTime) { + } else if !a.IgnoreSchedulerProcessing && allPodsAreNew(unschedulablePodsToHelp, currentTime) { // The assumption here is that these pods have been created very recently and probably there // is more pods to come. In theory we could check the newest pod time but then if pod were created // slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time. diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 13ccec459353..543f535dd325 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -243,7 +243,7 @@ var ( maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.") forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.") dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server") - enablePodTimeBuffer = flag.Bool("pod-time-buffer-enabled", true, "Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up.") + ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.") ) func isFlagPassed(name string) bool { @@ -391,7 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { MaxFreeDifferenceRatio: *maxFreeDifferenceRatio, }, DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled, - EnablePodTimeBuffer: *enablePodTimeBuffer, + IgnoreSchedulerProcessing: *ignoreSchedulerProcessing, } } diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index d665b8a6310b..bb8c70e0000e 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -205,14 +205,10 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod { func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod { var unschedulablePods []*apiv1.Pod for _, pod := range allPods { - if pod.Spec.NodeName == "" { - _, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled) - if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable { - if pod.GetDeletionTimestamp() == nil { - unschedulablePods = append(unschedulablePods, pod) - } - } + if !isUnschedulable(pod) { + continue; } + unschedulablePods = append(unschedulablePods, pod) } return unschedulablePods }