Skip to content

Commit

Permalink
Fix some comments + replace pod-time-buffer-enabled flag with ignore-…
Browse files Browse the repository at this point in the history
…scheduler-processing
  • Loading branch information
atwamahmoud committed Nov 6, 2023
1 parent 38522c8 commit 4ff68f5
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 18 deletions.
6 changes: 4 additions & 2 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ type AutoscalingOptions struct {
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
// Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up.
EnablePodTimeBuffer bool
//IgnoreSchedulerProcessing is used to signal whether CA will/won't wait
//for scheduler to mark pods as unschedulable and will process both marked & non-marked pods
//it will also signal whether we enable/disable waiting for pod time buffers before triggering a scale-up.
IgnoreSchedulerProcessing bool
}
39 changes: 39 additions & 0 deletions cluster-autoscaler/core/podlistprocessor/clear_tpu_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
)

type clearTpuRequests struct {
}

// NewClearTPURequestsPodListProcessor creates a PodListProcessor which clears TPU requests in pods
func NewClearTPURequestsPodListProcessor() *clearTpuRequests {
return &clearTpuRequests{}
}

// Process removes pods' tpu requests
func (p *clearTpuRequests) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return tpu.ClearTPURequests(pods), nil
}

func (p *clearTpuRequests) CleanUp() {
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {

// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
klog.V(4).Infof("Filtering out expendable pods")
nodes, err := context.AllNodeLister().List()
if err != nil {
return nil, err
Expand All @@ -55,7 +54,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type defaultPodListProcessor struct {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
Expand Down
16 changes: 11 additions & 5 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,11 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to list pods: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods, schedulerUnprocessed := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.SchedulerUnprocessedPods(pods)
originalScheduledPods, unschedulablePods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods)
schedulerUnprocessed := make([]*apiv1.Pod, 0, 0)
if a.IgnoreSchedulerProcessing {
schedulerUnprocessed = kube_util.SchedulerUnprocessedPods(pods)
}

// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
Expand Down Expand Up @@ -450,10 +454,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

// SchedulerUnprocessed might be zero here if it was disabled
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)

if a.IgnoreSchedulerProcessing {
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
}
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
Expand Down Expand Up @@ -537,7 +543,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if a.EnablePodTimeBuffer && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
} else if !a.IgnoreSchedulerProcessing && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ var (
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
enablePodTimeBuffer = flag.Bool("pod-time-buffer-enabled", true, "Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up.")
ignoreSchedulerProcessing = flag.Bool("ignore-scheduler-processing", false, "If true, cluster autoscaler will not wait for scheduler to mark pods as unschedulable and will process both marked & non-marked pods (Schedulable pods will be filtered before scaling-up) it will also disable waiting for pod time buffers before triggering a scale-up.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -391,7 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
EnablePodTimeBuffer: *enablePodTimeBuffer,
IgnoreSchedulerProcessing: *ignoreSchedulerProcessing,
}
}

Expand Down
10 changes: 3 additions & 7 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,10 @@ func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
for _, pod := range allPods {
if pod.Spec.NodeName == "" {
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable {
if pod.GetDeletionTimestamp() == nil {
unschedulablePods = append(unschedulablePods, pod)
}
}
if !isUnschedulable(pod) {
continue;
}
unschedulablePods = append(unschedulablePods, pod)
}
return unschedulablePods
}
Expand Down

0 comments on commit 4ff68f5

Please sign in to comment.