Skip to content

Commit

Permalink
Fixed some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
atwamahmoud committed Oct 31, 2023
1 parent 1c5bbed commit 3548ba1
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 57 deletions.
10 changes: 2 additions & 8 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,6 @@ type AutoscalingOptions struct {
// dynamicNodeDeleteDelayAfterTaintEnabled is used to enable/disable dynamic adjustment of NodeDeleteDelayAfterTaint
// based on the latency between the CA and the api-server
DynamicNodeDeleteDelayAfterTaintEnabled bool
// UnschedulablePodTimeBuffer controls when scale-ups happen so that
// the oldest unschedulable pod is older than UnschedulablePodTimeBuffer
UnschedulablePodTimeBuffer time.Duration
// UnschedulablePodWithGpuTimeBuffer specifies how old should the oldest unschedulable pod with GPU be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
UnschedulablePodWithGpuTimeBuffer time.Duration
// unschedulablePodWithGpuTimeBuffer = 30 * time.Second
// Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up.
EnablePodTimeBuffer bool
}
20 changes: 10 additions & 10 deletions cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,35 @@ import (
klog "k8s.io/klog/v2"
)

type filterOutExpandable struct {
type filterOutExpendable struct {
}

// NewFilterOutExpandablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpandablePodListProcessor() *filterOutExpandable {
return &filterOutExpandable{}
// NewFilterOutExpendablePodListProcessor creates a PodListProcessor filtering out expendable pods
func NewFilterOutExpendablePodListProcessor() *filterOutExpendable {
return &filterOutExpendable{}
}

// Process filters out pods which are expendable and adds pods which is waiting for lower priority pods preemption to the cluster snapshot
func (p *filterOutExpandable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
klog.V(4).Infof("Filtering out expandable pods")
func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods []*apiv1.Pod) ([]*apiv1.Pod, error) {
klog.V(4).Infof("Filtering out expendable pods")
nodes, err := context.AllNodeLister().List()
if err != nil {
return nil, err
}
expendablePodsPriorityCutoff := context.AutoscalingOptions.ExpendablePodsPriorityCutoff

unschedulablePods, waitingForLowerPriorityPreemption := core_utils.FilterOutExpendableAndSplit(pods, nodes, expendablePodsPriorityCutoff)
if err = p.addPreemptiblePodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
if err = p.addPreemptingPodsToSnapshot(waitingForLowerPriorityPreemption, context); err != nil {
return nil, err
}

return unschedulablePods, nil
}

// addPreemptiblePodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// addPreemptingPodsToSnapshot modifies the snapshot simulating scheduling of pods waiting for preemption.
// this is not strictly correct as we are not simulating preemption itself but it matches
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s waiting for preemption", err)
Expand All @@ -62,5 +62,5 @@ func (p *filterOutExpandable) addPreemptiblePodsToSnapshot(pods []*apiv1.Pod, ct
return nil
}

func (p *filterOutExpandable) CleanUp() {
func (p *filterOutExpendable) CleanUp() {
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,51 +53,51 @@ func TestFilterOutExpendable(t *testing.T) {
{
name: "non-expendable pods with priority >= to cutoff priority",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p1", 1000, 1, priority(2)),
test.BuildTestPod("p2", 1000, 1, priority(3)),
},
priorityCutoff: 2,
},
{
name: "single expednable pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getPrioritySetter(2)),
test.BuildTestPod("p", 1000, 1, priority(2)),
},
priorityCutoff: 3,
},
{
name: "single waiting-for-low-priority-preemption pod",
pods: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p", 1000, 1, nominatedNodeName("node-1")),
},
},
{
name: "mixed expendable, non-expendable & waiting-for-low-priority-preemption pods",
pods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p3", 1000, 1, getPrioritySetter(1)),
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p3", 1000, 1, priority(1)),
test.BuildTestPod("p4", 1000, 1),
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
priorityCutoff: 2,
wantPods: []*apiv1.Pod{
test.BuildTestPod("p1", 1000, 1, getPrioritySetter(3)),
test.BuildTestPod("p2", 1000, 1, getPrioritySetter(4)),
test.BuildTestPod("p1", 1000, 1, priority(3)),
test.BuildTestPod("p2", 1000, 1, priority(4)),
test.BuildTestPod("p4", 1000, 1),
},
wantPodsInSnapshot: []*apiv1.Pod{
test.BuildTestPod("p5", 1000, 1, getNominatedNodeNameSetter("node-1")),
test.BuildTestPod("p5", 1000, 1, nominatedNodeName("node-1")),
},
nodes: []*apiv1.Node{
test.BuildTestNode("node-1", 2400, 2400),
Expand All @@ -107,7 +107,7 @@ func TestFilterOutExpendable(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpandablePodListProcessor()
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)

Expand Down Expand Up @@ -141,12 +141,12 @@ func TestFilterOutExpendable(t *testing.T) {
}
}

func getPrioritySetter(priority int32) func(*apiv1.Pod) {
func priority(priority int32) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Spec.Priority = &priority
}
}
func getNominatedNodeNameSetter(nodeName string) func(*apiv1.Pod) {
func nominatedNodeName(nodeName string) func(*apiv1.Pod) {
return func(pod *apiv1.Pod) {
pod.Status.NominatedNodeName = nodeName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type defaultPodListProcessor struct {
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
NewFilterOutExpandablePodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
NewFilterOutSchedulablePodListProcessor(predicateChecker),
NewFilterOutDaemonSetPodListProcessor(),
Expand Down
17 changes: 12 additions & 5 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ import (
)

const (
// How old the oldest unschedulable pod should be before starting scale up.
unschedulablePodTimeBuffer = 2 * time.Second
// How old the oldest unschedulable pod with GPU should be before starting scale up.
// The idea is that nodes with GPU are very expensive and we're ready to sacrifice
// a bit more latency to wait for more pods and make a more informed scale-up decision.
unschedulablePodWithGpuTimeBuffer = 30 * time.Second

// NodeUpcomingAnnotation is an annotation CA adds to nodes which are upcoming.
NodeUpcomingAnnotation = "cluster-autoscaler.k8s.io/upcoming-node"

Expand Down Expand Up @@ -301,7 +308,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.Errorf("Failed to list pods: %v", err)
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
originalScheduledPods, unschedulablePods, unknownPods := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.UnknownPods(pods)
originalScheduledPods, unschedulablePods, schedulerUnprocessed := kube_util.ScheduledPods(pods), kube_util.UnschedulablePods(pods), kube_util.SchedulerUnprocessedPods(pods)

// Update cluster resource usage metrics
coresTotal, memoryTotal := calculateCoresMemoryTotal(allNodes, currentTime)
Expand Down Expand Up @@ -443,9 +450,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(unknownPods))
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, unknownPods...)
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)

// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
Expand Down Expand Up @@ -530,7 +537,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else if a.MaxNodesTotal > 0 && len(readyNodes) >= a.MaxNodesTotal {
scaleUpStatus.Result = status.ScaleUpNoOptionsAvailable
klog.V(1).Info("Max total nodes in cluster reached")
} else if allPodsAreNew(unschedulablePodsToHelp, currentTime, a.AutoscalingOptions.UnschedulablePodTimeBuffer, a.AutoscalingOptions.UnschedulablePodWithGpuTimeBuffer) {
} else if a.EnablePodTimeBuffer && allPodsAreNew(unschedulablePodsToHelp, currentTime) {
// The assumption here is that these pods have been created very recently and probably there
// is more pods to come. In theory we could check the newest pod time but then if pod were created
// slowly but at the pace of 1 every 2 seconds then no scale up would be triggered for long time.
Expand Down Expand Up @@ -963,7 +970,7 @@ func (a *StaticAutoscaler) reportTaintsCount(nodes []*apiv1.Node) {
}
}

func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time, unschedulablePodTimeBuffer, unschedulablePodWithGpuTimeBuffer time.Duration) bool {
func allPodsAreNew(pods []*apiv1.Pod, currentTime time.Time) bool {
if core_utils.GetOldestCreateTime(pods).Add(unschedulablePodTimeBuffer).After(currentTime) {
return true
}
Expand Down
6 changes: 2 additions & 4 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ var (
maxAllocatableDifferenceRatio = flag.Float64("max-allocatable-difference-ratio", config.DefaultMaxAllocatableDifferenceRatio, "Maximum difference in allocatable resources between two similar node groups to be considered for balancing. Value is a ratio of the smaller node group's allocatable resource.")
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
unschedulablePodTimeBuffer = flag.Duration("unschedulable-pod-time-buffer", 2*time.Second, "How old the oldest unschedulable pod should be before starting scale up.")
unschedulablePodWithGpuTimeBuffer = flag.Duration("unschedulable-pod-with-gpu-time-buffer", 30*time.Second, "How old the oldest unschedulable pod with GPU should be before starting scale up.")
enablePodTimeBuffer = flag.Bool("pod-time-buffer-enabled", true, "Enables waiting for a time buffer so that the oldest unschedulable pod should be before starting scale up.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -392,8 +391,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
MaxFreeDifferenceRatio: *maxFreeDifferenceRatio,
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
UnschedulablePodTimeBuffer: *unschedulablePodTimeBuffer,
UnschedulablePodWithGpuTimeBuffer: *unschedulablePodWithGpuTimeBuffer,
EnablePodTimeBuffer: *enablePodTimeBuffer,
}
}

Expand Down
13 changes: 9 additions & 4 deletions cluster-autoscaler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ var (
Namespace: caNamespace,
Name: "unschedulable_pods_count",
Help: "Number of unschedulable pods in the cluster.",
}, []string{"count_type"},
}, []string{"type"},
)

maxNodesCount = k8smetrics.NewGauge(
Expand Down Expand Up @@ -473,9 +473,14 @@ func UpdateNodeGroupsCount(autoscaled, autoprovisioned int) {
}

// UpdateUnschedulablePodsCount records number of currently unschedulable pods
func UpdateUnschedulablePodsCount(uschedulablePodsCount, unknownPodsCount int) {
unschedulablePodsCount.WithLabelValues("unschedulable").Set(float64(uschedulablePodsCount))
unschedulablePodsCount.WithLabelValues("unknown").Set(float64(unknownPodsCount))
func UpdateUnschedulablePodsCount(uschedulablePodsCount, schedulerUnprocessedCount int) {
UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount, "unschedulable")
UpdateUnschedulablePodsCountWithLabel(schedulerUnprocessedCount, "scheduler_unprocessed")
}

// UpdateUnschedulablePodsCount records number of currently unschedulable pods wil label "type" value "label"
func UpdateUnschedulablePodsCountWithLabel(uschedulablePodsCount int, label string) {
unschedulablePodsCount.WithLabelValues(label).Set(float64(uschedulablePodsCount))
}

// UpdateMaxNodesCount records the current maximum number of nodes being set for all node groups
Expand Down
44 changes: 36 additions & 8 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,61 @@ type PodLister interface {
List() ([]*apiv1.Pod, error)
}

func isScheduled(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
return pod.Spec.NodeName != ""
}
func isDeleted(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
return pod.GetDeletionTimestamp() != nil
}
func isUnschedulable(pod *apiv1.Pod) bool {
if pod == nil {
return false
}
if isScheduled(pod) || isDeleted(pod) {
return false
}
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || condition.Status != apiv1.ConditionFalse || condition.Reason != apiv1.PodReasonUnschedulable {
return false
}
return true
}

// ScheduledPods is a helper method that returns all scheduled pods from given pod list.
func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var scheduledPods []*apiv1.Pod
for _, pod := range allPods {
if pod.Spec.NodeName != "" {
if isScheduled(pod) {
scheduledPods = append(scheduledPods, pod)
continue
}
}
return scheduledPods
}

// UnknownPods is a helper method that returns all pods which are not yet processed by the scheduler
func UnknownPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unknownPods []*apiv1.Pod
// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the scheduler
func SchedulerUnprocessedPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unprocessedPods []*apiv1.Pod
for _, pod := range allPods {
// Make sure it's not scheduled or deleted
if pod.Spec.NodeName != "" || pod.GetDeletionTimestamp() != nil {
if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) {
continue
}
// Make sure it's not unschedulable
// Make sure that if it's not scheduled it's either
// Not processed (condition is nil)
// Or Reason is empty (not schedulerError, terminated, ...etc)
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") {
unknownPods = append(unknownPods, pod)
unprocessedPods = append(unprocessedPods, pod)
}
}
return unknownPods
return unprocessedPods
}

// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
Expand Down

0 comments on commit 3548ba1

Please sign in to comment.