From af638733e18329c3cbb59010fee55bdb67a13cf3 Mon Sep 17 00:00:00 2001 From: Artem Minyaylov Date: Wed, 27 Sep 2023 06:08:26 +0000 Subject: [PATCH 1/3] Extract drainability rules into packages --- .../core/scaledown/actuation/actuator.go | 11 ++-- .../core/scaledown/actuation/drain.go | 3 +- .../core/scaledown/legacy/legacy.go | 6 +- .../core/scaledown/planner/planner.go | 6 +- .../core/scaledown/planner/planner_test.go | 4 +- cluster-autoscaler/simulator/cluster.go | 10 +-- cluster-autoscaler/simulator/cluster_test.go | 3 +- cluster-autoscaler/simulator/drain.go | 44 ++++++------- cluster-autoscaler/simulator/drain_test.go | 26 ++++---- .../simulator/drainability/context.go | 26 ++++++++ .../{mirror.go => rules/mirror/rule.go} | 24 ++++---- .../mirror/rule_test.go} | 16 ++--- .../simulator/drainability/rules/rules.go | 61 +++++++++++++++++++ .../drainability/{rule.go => status.go} | 16 ----- 14 files changed, 165 insertions(+), 91 deletions(-) create mode 100644 cluster-autoscaler/simulator/drainability/context.go rename cluster-autoscaler/simulator/drainability/{mirror.go => rules/mirror/rule.go} (58%) rename cluster-autoscaler/simulator/drainability/{mirror_test.go => rules/mirror/rule_test.go} (79%) create mode 100644 cluster-autoscaler/simulator/drainability/rules/rules.go rename cluster-autoscaler/simulator/drainability/{rule.go => status.go} (86%) diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index 247f04767f39..fa491d5eb9a6 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -21,7 +21,6 @@ import ( "time" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" "k8s.io/klog/v2" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -30,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -223,7 +223,7 @@ func (a *Actuator) deleteAsyncDrain(NodeGroupViews []*budgets.NodeGroupView) (re } func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, batchSize int) { - var pdbs []*policyv1.PodDisruptionBudget + var remainingPdbTracker pdb.RemainingPdbTracker var registry kube_util.ListerRegistry if len(nodes) == 0 { @@ -246,7 +246,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider } if drain { - pdbs, err = a.ctx.PodDisruptionBudgetLister().List() + pdbs, err := a.ctx.PodDisruptionBudgetLister().List() if err != nil { klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)} @@ -255,7 +255,8 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider } return } - + remainingPdbTracker = pdb.NewBasicRemainingPdbTracker() + remainingPdbTracker.SetPdbs(pdbs) registry = a.ctx.ListerRegistry } @@ -272,7 +273,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider continue } - podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, pdbs, time.Now()) + podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, remainingPdbTracker, time.Now()) if err != nil { klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)} diff --git a/cluster-autoscaler/core/scaledown/actuation/drain.go b/cluster-autoscaler/core/scaledown/actuation/drain.go index e479abe34a99..97a4541da354 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain.go @@ -23,7 +23,6 @@ import ( "time" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" kube_errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -177,7 +176,7 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1 // EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node. func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error { nodeToDelete := nodeInfo.Node() - _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, []*policyv1.PodDisruptionBudget{}, timeNow) + _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, nil, timeNow) if err != nil { return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index 65dba06f7f4c..268f874651dd 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -147,7 +147,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( currentCandidates, destinations, timestamp, - sd.context.RemainingPdbTracker.GetPdbs()) + sd.context.RemainingPdbTracker) additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove) if additionalCandidatesCount > len(currentNonCandidates) { @@ -169,7 +169,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( currentNonCandidates[:additionalCandidatesPoolSize], destinations, timestamp, - sd.context.RemainingPdbTracker.GetPdbs()) + sd.context.RemainingPdbTracker) if len(additionalNodesToRemove) > additionalCandidatesCount { additionalNodesToRemove = additionalNodesToRemove[:additionalCandidatesCount] } @@ -317,7 +317,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time) (_, drain []*apiv1.Nod candidateNames, allNodeNames, time.Now(), - sd.context.RemainingPdbTracker.GetPdbs()) + sd.context.RemainingPdbTracker) findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart) for _, unremovableNode := range unremovable { diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index 0cd4002c4af3..bca5e00ff738 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -22,11 +22,11 @@ import ( "time" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" @@ -47,7 +47,7 @@ type eligibilityChecker interface { type removalSimulator interface { DropOldHints() - SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, pdbs []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) + SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, remainingPdbTracker pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) } // controllerReplicasCalculator calculates a number of target and expected replicas for a given controller. @@ -276,7 +276,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList)) break } - removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker.GetPdbs()) + removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker) if removable != nil { _, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule) if !inParallel { diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index d3c14a61b733..2fcf5cc866b3 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -32,6 +31,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable" . "k8s.io/autoscaler/cluster-autoscaler/core/test" @@ -901,7 +901,7 @@ type fakeRemovalSimulator struct { func (r *fakeRemovalSimulator) DropOldHints() {} -func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) { +func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) { time.Sleep(r.sleep) node := &apiv1.Node{} for _, n := range r.nodes { diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 23ccd037e2fa..1af916b64bf9 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" @@ -29,7 +30,6 @@ import ( schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" klog "k8s.io/klog/v2" ) @@ -117,7 +117,7 @@ func (r *RemovalSimulator) FindNodesToRemove( candidates []string, destinations []string, timestamp time.Time, - pdbs []*policyv1.PodDisruptionBudget, + remainingPdbTracker pdb.RemainingPdbTracker, ) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) { result := make([]NodeToBeRemoved, 0) unremovable := make([]*UnremovableNode, 0) @@ -128,7 +128,7 @@ func (r *RemovalSimulator) FindNodesToRemove( } for _, nodeName := range candidates { - rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, pdbs) + rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, remainingPdbTracker) if rn != nil { result = append(result, *rn) } else if urn != nil { @@ -146,7 +146,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval( nodeName string, destinationMap map[string]bool, timestamp time.Time, - pdbs []*policyv1.PodDisruptionBudget, + remainingPdbTracker pdb.RemainingPdbTracker, ) (*NodeToBeRemoved, *UnremovableNode) { nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName) if err != nil { @@ -159,7 +159,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval( return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError} } - podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, pdbs, timestamp) + podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, remainingPdbTracker, timestamp) if err != nil { klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err) if blockingPod != nil { diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 9a8b745466be..8c141eef56cf 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/kubelet/types" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -207,7 +206,7 @@ func TestFindNodesToRemove(t *testing.T) { } clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods) r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false) - toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), []*policyv1.PodDisruptionBudget{}) + toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil) fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove)) assert.Equal(t, toRemove, test.toRemove) assert.Equal(t, unremovable, test.unremovable) diff --git a/cluster-autoscaler/simulator/drain.go b/cluster-autoscaler/simulator/drain.go index 21edf94c1a0c..55090023debf 100644 --- a/cluster-autoscaler/simulator/drain.go +++ b/cluster-autoscaler/simulator/drain.go @@ -25,7 +25,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" @@ -44,7 +46,7 @@ type NodeDeleteOptions struct { // to allow their pods deletion in scale down MinReplicaCount int // DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node. - DrainabilityRules []drainability.Rule + DrainabilityRules rules.Rules } // NewNodeDeleteOptions returns new node delete options extracted from autoscaling options @@ -54,7 +56,7 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions { SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage, MinReplicaCount: opts.MinReplicaCount, SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods, - DrainabilityRules: drainability.DefaultRules(), + DrainabilityRules: rules.Default(), } } @@ -67,16 +69,22 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions { // still exist. // TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules. func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry, - pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { + remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { var drainPods, drainDs []*apiv1.Pod drainabilityRules := deleteOptions.DrainabilityRules if drainabilityRules == nil { - drainabilityRules = drainability.DefaultRules() + drainabilityRules = rules.Default() + } + if remainingPdbTracker == nil { + remainingPdbTracker = pdb.NewBasicRemainingPdbTracker() + } + drainCtx := &drainability.DrainContext{ + RemainingPdbTracker: remainingPdbTracker, } for _, podInfo := range nodeInfo.Pods { pod := podInfo.Pod - d := drainabilityStatus(pod, drainabilityRules) - switch d.Outcome { + status := drainabilityRules.Drainable(drainCtx, pod) + switch status.Outcome { case drainability.UndefinedOutcome: pods = append(pods, podInfo.Pod) case drainability.DrainOk: @@ -86,15 +94,18 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele drainPods = append(drainPods, pod) } case drainability.BlockDrain: - blockingPod = &drain.BlockingPod{pod, d.BlockingReason} - err = d.Error + blockingPod = &drain.BlockingPod{ + Pod: pod, + Reason: status.BlockingReason, + } + err = status.Error return - case drainability.SkipDrain: } } + pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain( pods, - pdbs, + remainingPdbTracker.GetPdbs(), deleteOptions.SkipNodesWithSystemPods, deleteOptions.SkipNodesWithLocalStorage, deleteOptions.SkipNodesWithCustomControllerPods, @@ -106,7 +117,7 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele if err != nil { return pods, daemonSetPods, blockingPod, err } - if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil { + if pdbBlockingPod, err := checkPdbs(pods, remainingPdbTracker.GetPdbs()); err != nil { return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err } @@ -131,14 +142,3 @@ func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain. } return nil, nil } - -func drainabilityStatus(pod *apiv1.Pod, dr []drainability.Rule) drainability.Status { - for _, f := range dr { - if d := f.Drainable(pod); d.Outcome != drainability.UndefinedOutcome { - return d - } - } - return drainability.Status{ - Outcome: drainability.UndefinedOutcome, - } -} diff --git a/cluster-autoscaler/simulator/drain_test.go b/cluster-autoscaler/simulator/drain_test.go index 02ad0ba372c1..32f4d64caadc 100644 --- a/cluster-autoscaler/simulator/drain_test.go +++ b/cluster-autoscaler/simulator/drain_test.go @@ -25,7 +25,9 @@ import ( policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/kubernetes/pkg/kubelet/types" @@ -179,7 +181,7 @@ func TestGetPodsToMove(t *testing.T) { desc string pods []*apiv1.Pod pdbs []*policyv1.PodDisruptionBudget - rules []drainability.Rule + rules []rules.Rule wantPods []*apiv1.Pod wantDs []*apiv1.Pod wantBlocking *drain.BlockingPod @@ -256,19 +258,19 @@ func TestGetPodsToMove(t *testing.T) { { desc: "Rule allows", pods: []*apiv1.Pod{unreplicatedPod}, - rules: []drainability.Rule{alwaysDrain{}}, + rules: []rules.Rule{alwaysDrain{}}, wantPods: []*apiv1.Pod{unreplicatedPod}, }, { desc: "Second rule allows", pods: []*apiv1.Pod{unreplicatedPod}, - rules: []drainability.Rule{cantDecide{}, alwaysDrain{}}, + rules: []rules.Rule{cantDecide{}, alwaysDrain{}}, wantPods: []*apiv1.Pod{unreplicatedPod}, }, { desc: "Rule blocks", pods: []*apiv1.Pod{rsPod}, - rules: []drainability.Rule{neverDrain{}}, + rules: []rules.Rule{neverDrain{}}, wantErr: true, wantBlocking: &drain.BlockingPod{ Pod: rsPod, @@ -278,7 +280,7 @@ func TestGetPodsToMove(t *testing.T) { { desc: "Second rule blocks", pods: []*apiv1.Pod{rsPod}, - rules: []drainability.Rule{cantDecide{}, neverDrain{}}, + rules: []rules.Rule{cantDecide{}, neverDrain{}}, wantErr: true, wantBlocking: &drain.BlockingPod{ Pod: rsPod, @@ -288,7 +290,7 @@ func TestGetPodsToMove(t *testing.T) { { desc: "Undecisive rule fallback to default logic: Unreplicated pod", pods: []*apiv1.Pod{unreplicatedPod}, - rules: []drainability.Rule{cantDecide{}}, + rules: []rules.Rule{cantDecide{}}, wantErr: true, wantBlocking: &drain.BlockingPod{ Pod: unreplicatedPod, @@ -298,7 +300,7 @@ func TestGetPodsToMove(t *testing.T) { { desc: "Undecisive rule fallback to default logic: Replicated pod", pods: []*apiv1.Pod{rsPod}, - rules: []drainability.Rule{cantDecide{}}, + rules: []rules.Rule{cantDecide{}}, wantPods: []*apiv1.Pod{rsPod}, }, } @@ -311,7 +313,9 @@ func TestGetPodsToMove(t *testing.T) { SkipNodesWithCustomControllerPods: true, DrainabilityRules: tc.rules, } - p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tc.pdbs, testTime) + tracker := pdb.NewBasicRemainingPdbTracker() + tracker.SetPdbs(tc.pdbs) + p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tracker, testTime) if tc.wantErr { assert.Error(t, err) } else { @@ -326,18 +330,18 @@ func TestGetPodsToMove(t *testing.T) { type alwaysDrain struct{} -func (a alwaysDrain) Drainable(*apiv1.Pod) drainability.Status { +func (a alwaysDrain) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status { return drainability.NewDrainableStatus() } type neverDrain struct{} -func (n neverDrain) Drainable(*apiv1.Pod) drainability.Status { +func (n neverDrain) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status { return drainability.NewBlockedStatus(drain.UnexpectedError, fmt.Errorf("nope")) } type cantDecide struct{} -func (c cantDecide) Drainable(*apiv1.Pod) drainability.Status { +func (c cantDecide) Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status { return drainability.NewUndefinedStatus() } diff --git a/cluster-autoscaler/simulator/drainability/context.go b/cluster-autoscaler/simulator/drainability/context.go new file mode 100644 index 000000000000..c2a341a84f8d --- /dev/null +++ b/cluster-autoscaler/simulator/drainability/context.go @@ -0,0 +1,26 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drainability + +import ( + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" +) + +// DrainContext contains parameters for drainability rules. +type DrainContext struct { + RemainingPdbTracker pdb.RemainingPdbTracker +} diff --git a/cluster-autoscaler/simulator/drainability/mirror.go b/cluster-autoscaler/simulator/drainability/rules/mirror/rule.go similarity index 58% rename from cluster-autoscaler/simulator/drainability/mirror.go rename to cluster-autoscaler/simulator/drainability/rules/mirror/rule.go index 668c3993220b..549fd6c9fbfa 100644 --- a/cluster-autoscaler/simulator/drainability/mirror.go +++ b/cluster-autoscaler/simulator/drainability/rules/mirror/rule.go @@ -14,26 +14,26 @@ See the License for the specific language governing permissions and limitations under the License. */ -package drainability +package mirror import ( - "k8s.io/autoscaler/cluster-autoscaler/utils/pod" - apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" + pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" ) -// MirrorPodRule is a drainability rule on how to handle mirror pods. -type MirrorPodRule struct{} +// Rule is a drainability rule on how to handle mirror pods. +type Rule struct{} -// NewMirrorPodRule creates a new MirrorPodRule. -func NewMirrorPodRule() *MirrorPodRule { - return &MirrorPodRule{} +// New creates a new Rule. +func New() *Rule { + return &Rule{} } // Drainable decides what to do with mirror pods on node drain. -func (m *MirrorPodRule) Drainable(p *apiv1.Pod) Status { - if pod.IsMirrorPod(p) { - return NewSkipStatus() +func (Rule) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status { + if pod_util.IsMirrorPod(pod) { + return drainability.NewSkipStatus() } - return NewUndefinedStatus() + return drainability.NewUndefinedStatus() } diff --git a/cluster-autoscaler/simulator/drainability/mirror_test.go b/cluster-autoscaler/simulator/drainability/rules/mirror/rule_test.go similarity index 79% rename from cluster-autoscaler/simulator/drainability/mirror_test.go rename to cluster-autoscaler/simulator/drainability/rules/mirror/rule_test.go index 961b3d925eae..e05613daaedd 100644 --- a/cluster-autoscaler/simulator/drainability/mirror_test.go +++ b/cluster-autoscaler/simulator/drainability/rules/mirror/rule_test.go @@ -14,21 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. */ -package drainability +package mirror import ( "testing" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/kubernetes/pkg/kubelet/types" ) -func TestMirrorPodRule(t *testing.T) { +func TestRule(t *testing.T) { testCases := []struct { desc string pod *apiv1.Pod - want Status + want drainability.Status }{ { desc: "non mirror pod", @@ -38,7 +39,7 @@ func TestMirrorPodRule(t *testing.T) { Namespace: "ns", }, }, - want: NewUndefinedStatus(), + want: drainability.NewUndefinedStatus(), }, { desc: "mirror pod", @@ -51,15 +52,14 @@ func TestMirrorPodRule(t *testing.T) { }, }, }, - want: NewSkipStatus(), + want: drainability.NewSkipStatus(), }, } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - m := NewMirrorPodRule() - got := m.Drainable(tc.pod) + got := New().Drainable(nil, tc.pod) if tc.want != got { - t.Errorf("MirrorPodRule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want) + t.Errorf("Rule.Drainable(%v) = %v, want %v", tc.pod.Name, got, tc.want) } }) } diff --git a/cluster-autoscaler/simulator/drainability/rules/rules.go b/cluster-autoscaler/simulator/drainability/rules/rules.go new file mode 100644 index 000000000000..1733a2b8dac5 --- /dev/null +++ b/cluster-autoscaler/simulator/drainability/rules/rules.go @@ -0,0 +1,61 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rules + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules/mirror" +) + +// Rule determines whether a given pod can be drained or not. +type Rule interface { + // Drainable determines whether a given pod is drainable according to + // the specific Rule. + // + // DrainContext cannot be nil. + Drainable(*drainability.DrainContext, *apiv1.Pod) drainability.Status +} + +// Default returns the default list of Rules. +func Default() Rules { + return []Rule{ + mirror.New(), + } +} + +// Rules defines operations on a collections of rules. +type Rules []Rule + +// Drainable determines whether a given pod is drainable according to the +// specified set of rules. +func (rs Rules) Drainable(drainCtx *drainability.DrainContext, pod *apiv1.Pod) drainability.Status { + if drainCtx == nil { + drainCtx = &drainability.DrainContext{} + } + if drainCtx.RemainingPdbTracker == nil { + drainCtx.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker() + } + + for _, r := range rs { + if d := r.Drainable(drainCtx, pod); d.Outcome != drainability.UndefinedOutcome { + return d + } + } + return drainability.NewUndefinedStatus() +} diff --git a/cluster-autoscaler/simulator/drainability/rule.go b/cluster-autoscaler/simulator/drainability/status.go similarity index 86% rename from cluster-autoscaler/simulator/drainability/rule.go rename to cluster-autoscaler/simulator/drainability/status.go index f84b33b60edd..d73f346bead2 100644 --- a/cluster-autoscaler/simulator/drainability/rule.go +++ b/cluster-autoscaler/simulator/drainability/status.go @@ -18,8 +18,6 @@ package drainability import ( "k8s.io/autoscaler/cluster-autoscaler/utils/drain" - - apiv1 "k8s.io/api/core/v1" ) // OutcomeType identifies the action that should be taken when it comes to @@ -79,17 +77,3 @@ func NewSkipStatus() Status { func NewUndefinedStatus() Status { return Status{} } - -// Rule determines whether a given pod can be drained or not. -type Rule interface { - // Drainable determines whether a given pod is drainable according to - // the specific Rule. - Drainable(*apiv1.Pod) Status -} - -// DefaultRules returns the default list of Rules. -func DefaultRules() []Rule { - return []Rule{ - NewMirrorPodRule(), - } -} From a68b748fd7d7e008065c383faedfdd1d63ffb5a9 Mon Sep 17 00:00:00 2001 From: Artem Minyaylov Date: Thu, 28 Sep 2023 23:48:26 +0000 Subject: [PATCH 2/3] Refactor NodeDeleteOptions for use in drainability rules --- cluster-autoscaler/core/autoscaler.go | 10 ++-- .../core/scaledown/actuation/actuator.go | 12 +++-- .../core/scaledown/actuation/drain.go | 10 ++-- .../core/scaledown/legacy/legacy.go | 6 ++- .../core/scaledown/legacy/legacy_test.go | 8 ++-- .../core/scaledown/planner/planner.go | 6 ++- .../core/scaledown/planner/planner_test.go | 13 ++--- cluster-autoscaler/core/static_autoscaler.go | 11 +++-- .../core/static_autoscaler_test.go | 25 +++++----- cluster-autoscaler/main.go | 16 +++---- .../empty_candidates_sorting.go | 15 ++++-- .../empty_candidates_sorting_test.go | 31 ++++++------ cluster-autoscaler/simulator/cluster.go | 12 +++-- cluster-autoscaler/simulator/cluster_test.go | 10 ++-- cluster-autoscaler/simulator/drain.go | 33 ++----------- cluster-autoscaler/simulator/drain_test.go | 7 ++- .../simulator/drainability/context.go | 2 + .../simulator/options/nodedelete.go | 48 +++++++++++++++++++ 18 files changed, 164 insertions(+), 111 deletions(-) create mode 100644 cluster-autoscaler/simulator/options/nodedelete.go diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 4fd81f3e7436..29d4b841b8ba 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -31,8 +31,9 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/expander/factory" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" - "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -57,7 +58,8 @@ type AutoscalerOptions struct { DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter RemainingPdbTracker pdb.RemainingPdbTracker ScaleUpOrchestrator scaleup.Orchestrator - DeleteOptions simulator.NodeDeleteOptions + DeleteOptions options.NodeDeleteOptions + DrainabilityRules rules.Rules } // Autoscaler is the main component of CA which scales up/down node groups according to its configuration @@ -90,7 +92,9 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) opts.DebuggingSnapshotter, opts.RemainingPdbTracker, opts.ScaleUpOrchestrator, - opts.DeleteOptions), nil + opts.DeleteOptions, + opts.DrainabilityRules, + ), nil } // Initialize default options if not provided. diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index fa491d5eb9a6..d2f4315ea0e4 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -35,6 +35,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -47,7 +49,8 @@ type Actuator struct { clusterState *clusterstate.ClusterStateRegistry nodeDeletionTracker *deletiontracker.NodeDeletionTracker nodeDeletionScheduler *GroupDeletionScheduler - deleteOptions simulator.NodeDeleteOptions + deleteOptions options.NodeDeleteOptions + drainabilityRules rules.Rules // TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor // This is a larger change to the code structure which impacts some existing actuator unit tests // as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor @@ -64,15 +67,16 @@ type actuatorNodeGroupConfigGetter interface { } // NewActuator returns a new instance of Actuator. -func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator { +func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator { ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval) return &Actuator{ ctx: ctx, clusterState: csr, nodeDeletionTracker: ndt, - nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)), + nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, drainabilityRules, ndt)), budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx), deleteOptions: deleteOptions, + drainabilityRules: drainabilityRules, configGetter: configGetter, nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint, } @@ -273,7 +277,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider continue } - podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, remainingPdbTracker, time.Now()) + podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, a.drainabilityRules, registry, remainingPdbTracker, time.Now()) if err != nil { klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err) nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)} diff --git a/cluster-autoscaler/core/scaledown/actuation/drain.go b/cluster-autoscaler/core/scaledown/actuation/drain.go index 97a4541da354..456d29045d2e 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain.go @@ -32,6 +32,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" @@ -61,11 +63,12 @@ type Evictor struct { DsEvictionEmptyNodeTimeout time.Duration PodEvictionHeadroom time.Duration evictionRegister evictionRegister - deleteOptions simulator.NodeDeleteOptions + deleteOptions options.NodeDeleteOptions + drainabilityRules rules.Rules } // NewDefaultEvictor returns an instance of Evictor using the default parameters. -func NewDefaultEvictor(deleteOptions simulator.NodeDeleteOptions, evictionRegister evictionRegister) Evictor { +func NewDefaultEvictor(deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, evictionRegister evictionRegister) Evictor { return Evictor{ EvictionRetryTime: DefaultEvictionRetryTime, DsEvictionRetryTime: DefaultDsEvictionRetryTime, @@ -73,6 +76,7 @@ func NewDefaultEvictor(deleteOptions simulator.NodeDeleteOptions, evictionRegist PodEvictionHeadroom: DefaultPodEvictionHeadroom, evictionRegister: evictionRegister, deleteOptions: deleteOptions, + drainabilityRules: drainabilityRules, } } @@ -176,7 +180,7 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1 // EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node. func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error { nodeToDelete := nodeInfo.Node() - _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, nil, timeNow) + _, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, e.drainabilityRules, nil, nil, timeNow) if err != nil { return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err) } diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy.go b/cluster-autoscaler/core/scaledown/legacy/legacy.go index 268f874651dd..2cdd0ec168f3 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy.go @@ -32,6 +32,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/processors" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -55,9 +57,9 @@ type ScaleDown struct { } // NewScaleDown builds new ScaleDown object. -func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *ScaleDown { +func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *ScaleDown { usageTracker := simulator.NewUsageTracker() - removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, false) + removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, drainabilityRules, false) unremovableNodes := unremovable.NewNodes() resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) return &ScaleDown{ diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index be6d7565d4b3..1ba61e250dfd 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" autoscaler_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" appsv1 "k8s.io/api/apps/v1" @@ -1287,14 +1288,13 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry if ndt == nil { ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second) } - deleteOptions := simulator.NodeDeleteOptions{ + deleteOptions := options.NodeDeleteOptions{ SkipNodesWithSystemPods: true, SkipNodesWithLocalStorage: true, - MinReplicaCount: 0, SkipNodesWithCustomControllerPods: true, } processors := NewTestProcessors(ctx) - sd := NewScaleDown(ctx, processors, ndt, deleteOptions) - actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor) + sd := NewScaleDown(ctx, processors, ndt, deleteOptions, nil) + actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, nil, processors.NodeGroupConfigProcessor) return NewScaleDownWrapper(sd, actuator) } diff --git a/cluster-autoscaler/core/scaledown/planner/planner.go b/cluster-autoscaler/core/scaledown/planner/planner.go index bca5e00ff738..032d93c4f09c 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner.go +++ b/cluster-autoscaler/core/scaledown/planner/planner.go @@ -34,6 +34,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -77,7 +79,7 @@ type Planner struct { } // New creates a new Planner object. -func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner { +func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner { resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor) minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime if minUpdateInterval == 0*time.Nanosecond { @@ -87,7 +89,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling context: context, unremovableNodes: unremovable.NewNodes(), unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder), - rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, true), + rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, drainabilityRules, true), actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker), eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor), nodeUtilizationMap: make(map[string]utilization.Info), diff --git a/cluster-autoscaler/core/scaledown/planner/planner_test.go b/cluster-autoscaler/core/scaledown/planner/planner_test.go index 2fcf5cc866b3..e92eb1ab4223 100644 --- a/cluster-autoscaler/core/scaledown/planner/planner_test.go +++ b/cluster-autoscaler/core/scaledown/planner/planner_test.go @@ -37,6 +37,7 @@ import ( . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -492,8 +493,8 @@ func TestUpdateClusterState(t *testing.T) { }, &fake.Clientset{}, registry, provider, nil, nil) assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods) - deleteOptions := simulator.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions) + deleteOptions := options.NodeDeleteOptions{} + p := New(&context, NewTestProcessors(&context), deleteOptions, nil) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)} if tc.isSimulationTimeout { context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second @@ -611,8 +612,8 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) { }, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil) - deleteOptions := simulator.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions) + deleteOptions := options.NodeDeleteOptions{} + p := New(&context, NewTestProcessors(&context), deleteOptions, nil) p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))} p.minUpdateInterval = tc.updateInterval p.unneededNodes.Update(previouslyUnneeded, time.Now()) @@ -779,8 +780,8 @@ func TestNodesToDelete(t *testing.T) { }, &fake.Clientset{}, nil, provider, nil, nil) assert.NoError(t, err) clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil) - deleteOptions := simulator.NodeDeleteOptions{} - p := New(&context, NewTestProcessors(&context), deleteOptions) + deleteOptions := options.NodeDeleteOptions{} + p := New(&context, NewTestProcessors(&context), deleteOptions, nil) p.latestUpdate = time.Now() p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second) p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour)) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index e31a63fc7007..68b870786d02 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -51,6 +51,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/status" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -142,7 +144,8 @@ func NewStaticAutoscaler( debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter, remainingPdbTracker pdb.RemainingPdbTracker, scaleUpOrchestrator scaleup.Orchestrator, - deleteOptions simulator.NodeDeleteOptions) *StaticAutoscaler { + deleteOptions options.NodeDeleteOptions, + drainabilityRules rules.Rules) *StaticAutoscaler { clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage, @@ -169,14 +172,14 @@ func NewStaticAutoscaler( // TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext // during the struct creation rather than here. ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions) - actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor) + scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions, drainabilityRules) + actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor) autoscalingContext.ScaleDownActuator = actuator var scaleDownPlanner scaledown.Planner var scaleDownActuator scaledown.Actuator if opts.ParallelDrain { - scaleDownPlanner = planner.New(autoscalingContext, processors, deleteOptions) + scaleDownPlanner = planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules) scaleDownActuator = actuator } else { // TODO: Remove the wrapper once the legacy implementation becomes obsolete. diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index ace16dde111e..1a6fc141ec46 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -45,6 +45,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -149,9 +151,9 @@ func (m *onNodeGroupDeleteMock) Delete(id string) error { return args.Error(0) } -func setUpScaleDownActuator(ctx *context.AutoscalingContext, options config.AutoscalingOptions) { - deleteOptions := simulator.NewNodeDeleteOptions(options) - ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, NewTestProcessors(ctx).NodeGroupConfigProcessor) +func setUpScaleDownActuator(ctx *context.AutoscalingContext, autoscalingOptions config.AutoscalingOptions) { + deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) + ctx.ScaleDownActuator = actuation.NewActuator(ctx, nil, deletiontracker.NewNodeDeletionTracker(0*time.Second), deleteOptions, rules.Default(), NewTestProcessors(ctx).NodeGroupConfigProcessor) } func TestStaticAutoscalerRunOnce(t *testing.T) { @@ -1447,11 +1449,11 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { kubernetes.NewTestPodLister(nil), kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister, nil, nil, nil, nil) - // Create context with minimal options that guarantee we reach the tested logic. - // We're only testing the input to UpdateClusterState which should be called whenever scale-down is enabled, other options shouldn't matter. - options := config.AutoscalingOptions{ScaleDownEnabled: true} + // Create context with minimal autoscalingOptions that guarantee we reach the tested logic. + // We're only testing the input to UpdateClusterState which should be called whenever scale-down is enabled, other autoscalingOptions shouldn't matter. + autoscalingOptions := config.AutoscalingOptions{ScaleDownEnabled: true} processorCallbacks := newStaticAutoscalerProcessorCallbacks() - ctx, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil) + ctx, err := NewScaleTestAutoscalingContext(autoscalingOptions, &fake.Clientset{}, listerRegistry, provider, processorCallbacks, nil) assert.NoError(t, err) // Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic. @@ -1459,7 +1461,7 @@ func TestStaticAutoscalerUpcomingScaleDownCandidates(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, csrConfig, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute})) // Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test. - actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), simulator.NodeDeleteOptions{}, NewTestProcessors(&ctx).NodeGroupConfigProcessor) + actuator := actuation.NewActuator(&ctx, csr, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, NewTestProcessors(&ctx).NodeGroupConfigProcessor) ctx.ScaleDownActuator = actuator // Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState. @@ -1847,15 +1849,14 @@ func newScaleDownPlannerAndActuator(t *testing.T, ctx *context.AutoscalingContex ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second ctx.NodeDeleteDelayAfterTaint = 1 * time.Second - deleteOptions := simulator.NodeDeleteOptions{ + deleteOptions := options.NodeDeleteOptions{ SkipNodesWithSystemPods: true, SkipNodesWithLocalStorage: true, - MinReplicaCount: 0, SkipNodesWithCustomControllerPods: true, } ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions) - actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, p.NodeGroupConfigProcessor) + sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions, nil) + actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, nil, p.NodeGroupConfigProcessor) wrapper := legacy.NewScaleDownWrapper(sd, actuator) return wrapper, wrapper } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 9e021b8963c3..6855ca1c6c49 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -29,10 +29,6 @@ import ( "syscall" "time" - "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" - "k8s.io/autoscaler/cluster-autoscaler/simulator" - "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" - "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/api/meta" @@ -45,6 +41,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core" "k8s.io/autoscaler/cluster-autoscaler/core/podlistprocessor" + "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -55,6 +52,9 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates" "k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" + "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" @@ -68,7 +68,7 @@ import ( "k8s.io/client-go/tools/leaderelection/resourcelock" kube_flag "k8s.io/component-base/cli/flag" componentbaseconfig "k8s.io/component-base/config" - "k8s.io/component-base/config/options" + componentopts "k8s.io/component-base/config/options" "k8s.io/component-base/logs" logsapi "k8s.io/component-base/logs/api/v1" _ "k8s.io/component-base/logs/json/register" @@ -461,7 +461,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter if err != nil { return nil, err } - deleteOptions := simulator.NewNodeDeleteOptions(autoscalingOptions) + deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, @@ -481,7 +481,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter if autoscalingOptions.ParallelDrain { sdCandidatesSorting := previouscandidates.NewPreviousCandidates() scaleDownCandidatesComparers = []scaledowncandidates.CandidatesComparer{ - emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions), + emptycandidates.NewEmptySortingProcessor(emptycandidates.NewNodeInfoGetter(opts.ClusterSnapshot), deleteOptions, rules.Default()), sdCandidatesSorting, } opts.Processors.ScaleDownCandidatesNotifier.Register(sdCandidatesSorting) @@ -575,7 +575,7 @@ func main() { leaderElection := defaultLeaderElectionConfiguration() leaderElection.LeaderElect = true - options.BindLeaderElectionFlags(&leaderElection, pflag.CommandLine) + componentopts.BindLeaderElectionFlags(&leaderElection, pflag.CommandLine) featureGate := utilfeature.DefaultMutableFeatureGate loggingConfig := logsapi.NewLoggingConfiguration() diff --git a/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting.go b/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting.go index 38dbeec3071a..8ad745648c2a 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting.go +++ b/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting.go @@ -22,6 +22,8 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -45,12 +47,17 @@ func NewNodeInfoGetter(c clustersnapshot.ClusterSnapshot) *nodeInfoGetterImpl { // EmptySorting is sorting scale down candidates so that empty nodes appear first. type EmptySorting struct { nodeInfoGetter - deleteOptions simulator.NodeDeleteOptions + deleteOptions options.NodeDeleteOptions + drainabilityRules rules.Rules } // NewEmptySortingProcessor return EmptySorting struct. -func NewEmptySortingProcessor(n nodeInfoGetter, deleteOptions simulator.NodeDeleteOptions) *EmptySorting { - return &EmptySorting{n, deleteOptions} +func NewEmptySortingProcessor(n nodeInfoGetter, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *EmptySorting { + return &EmptySorting{ + nodeInfoGetter: n, + deleteOptions: deleteOptions, + drainabilityRules: drainabilityRules, + } } // ScaleDownEarlierThan return true if node1 is empty and node2 isn't. @@ -66,7 +73,7 @@ func (p *EmptySorting) isNodeEmpty(node *apiv1.Node) bool { if err != nil { return false } - podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, p.deleteOptions, nil, nil, time.Now()) + podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, p.deleteOptions, p.drainabilityRules, nil, nil, time.Now()) if err == nil && len(podsToRemove) == 0 { return true } diff --git a/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting_test.go b/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting_test.go index 4aedcd7ea15c..469ddcd81382 100644 --- a/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting_test.go +++ b/cluster-autoscaler/processors/scaledowncandidates/emptycandidates/empty_candidates_sorting_test.go @@ -21,7 +21,7 @@ import ( "testing" v1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -62,13 +62,15 @@ func TestScaleDownEarlierThan(t *testing.T) { niGetter := testNodeInfoGetter{map[string]*schedulerframework.NodeInfo{nodeEmptyName: niEmpty, nodeNonEmptyName: niNonEmpty, nodeEmptyName2: niEmpty2}} - deleteOptions := simulator.NodeDeleteOptions{ + deleteOptions := options.NodeDeleteOptions{ SkipNodesWithSystemPods: true, SkipNodesWithLocalStorage: true, - MinReplicaCount: 0, SkipNodesWithCustomControllerPods: true, } - p := EmptySorting{&niGetter, deleteOptions} + p := EmptySorting{ + nodeInfoGetter: &niGetter, + deleteOptions: deleteOptions, + } tests := []struct { name string @@ -95,22 +97,19 @@ func TestScaleDownEarlierThan(t *testing.T) { wantEarlier: true, }, { - name: "Non-empty node is not earlier that node without nodeInfo", - node1: nodeNonEmpty, - node2: noNodeInfoNode, - wantEarlier: false, + name: "Non-empty node is not earlier that node without nodeInfo", + node1: nodeNonEmpty, + node2: noNodeInfoNode, }, { - name: "Node without nodeInfo is not earlier that non-empty node", - node1: noNodeInfoNode, - node2: nodeNonEmpty, - wantEarlier: false, + name: "Node without nodeInfo is not earlier that non-empty node", + node1: noNodeInfoNode, + node2: nodeNonEmpty, }, { - name: "Empty node is not earlier that another empty node", - node1: nodeEmpty, - node2: nodeEmpty2, - wantEarlier: false, + name: "Empty node is not earlier that another empty node", + node1: nodeEmpty, + node2: nodeEmpty2, }, } for _, test := range tests { diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 1af916b64bf9..2bb77b7aace8 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -22,6 +22,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" @@ -95,19 +97,21 @@ type RemovalSimulator struct { clusterSnapshot clustersnapshot.ClusterSnapshot usageTracker *UsageTracker canPersist bool - deleteOptions NodeDeleteOptions + deleteOptions options.NodeDeleteOptions + drainabilityRules rules.Rules schedulingSimulator *scheduling.HintingSimulator } // NewRemovalSimulator returns a new RemovalSimulator. func NewRemovalSimulator(listers kube_util.ListerRegistry, clusterSnapshot clustersnapshot.ClusterSnapshot, predicateChecker predicatechecker.PredicateChecker, - usageTracker *UsageTracker, deleteOptions NodeDeleteOptions, persistSuccessfulSimulations bool) *RemovalSimulator { + usageTracker *UsageTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, persistSuccessfulSimulations bool) *RemovalSimulator { return &RemovalSimulator{ listers: listers, clusterSnapshot: clusterSnapshot, usageTracker: usageTracker, canPersist: persistSuccessfulSimulations, deleteOptions: deleteOptions, + drainabilityRules: drainabilityRules, schedulingSimulator: scheduling.NewHintingSimulator(predicateChecker), } } @@ -159,7 +163,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval( return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError} } - podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, remainingPdbTracker, timestamp) + podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.drainabilityRules, r.listers, remainingPdbTracker, timestamp) if err != nil { klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err) if blockingPod != nil { @@ -193,7 +197,7 @@ func (r *RemovalSimulator) FindEmptyNodesToRemove(candidates []string, timestamp continue } // Should block on all pods - podsToRemove, _, _, err := GetPodsToMove(nodeInfo, r.deleteOptions, nil, nil, timestamp) + podsToRemove, _, _, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.drainabilityRules, nil, nil, timestamp) if err == nil && len(podsToRemove) == 0 { result = append(result, node) } diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 8c141eef56cf..136e0d47d2a9 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -22,6 +22,7 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -58,7 +59,7 @@ func TestFindEmptyNodes(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2}) testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) - r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, testDeleteOptions(), false) + r := NewRemovalSimulator(nil, clusterSnapshot, nil, nil, testDeleteOptions(), nil, false) emptyNodes := r.FindEmptyNodesToRemove(nodeNames, testTime) assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes) } @@ -205,7 +206,7 @@ func TestFindNodesToRemove(t *testing.T) { destinations = append(destinations, node.Name) } clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods) - r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false) + r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), nil, false) toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil) fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove)) assert.Equal(t, toRemove, test.toRemove) @@ -214,11 +215,10 @@ func TestFindNodesToRemove(t *testing.T) { } } -func testDeleteOptions() NodeDeleteOptions { - return NodeDeleteOptions{ +func testDeleteOptions() options.NodeDeleteOptions { + return options.NodeDeleteOptions{ SkipNodesWithSystemPods: true, SkipNodesWithLocalStorage: true, - MinReplicaCount: 0, SkipNodesWithCustomControllerPods: true, } } diff --git a/cluster-autoscaler/simulator/drain.go b/cluster-autoscaler/simulator/drain.go index 55090023debf..a5d37bbc397a 100644 --- a/cluster-autoscaler/simulator/drain.go +++ b/cluster-autoscaler/simulator/drain.go @@ -24,42 +24,16 @@ import ( policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) -// NodeDeleteOptions contains various options to customize how draining will behave -type NodeDeleteOptions struct { - // SkipNodesWithSystemPods tells if nodes with pods from kube-system should be deleted (except for DaemonSet or mirror pods) - SkipNodesWithSystemPods bool - // SkipNodesWithLocalStorage tells if nodes with pods with local storage, e.g. EmptyDir or HostPath, should be deleted - SkipNodesWithLocalStorage bool - // SkipNodesWithCustomControllerPods tells if nodes with custom-controller owned pods should be skipped from deletion (skip if 'true') - SkipNodesWithCustomControllerPods bool - // MinReplicaCount controls the minimum number of replicas that a replica set or replication controller should have - // to allow their pods deletion in scale down - MinReplicaCount int - // DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node. - DrainabilityRules rules.Rules -} - -// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options -func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions { - return NodeDeleteOptions{ - SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods, - SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage, - MinReplicaCount: opts.MinReplicaCount, - SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods, - DrainabilityRules: rules.Default(), - } -} - // GetPodsToMove returns a list of pods that should be moved elsewhere // and a list of DaemonSet pods that should be evicted if the node // is drained. Raises error if there is an unreplicated pod. @@ -68,10 +42,8 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions { // If listers is not nil it checks whether RC, DS, Jobs and RS that created these pods // still exist. // TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules. -func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry, - remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { +func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, listers kube_util.ListerRegistry, remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) { var drainPods, drainDs []*apiv1.Pod - drainabilityRules := deleteOptions.DrainabilityRules if drainabilityRules == nil { drainabilityRules = rules.Default() } @@ -80,6 +52,7 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele } drainCtx := &drainability.DrainContext{ RemainingPdbTracker: remainingPdbTracker, + DeleteOptions: deleteOptions, } for _, podInfo := range nodeInfo.Pods { pod := podInfo.Pod diff --git a/cluster-autoscaler/simulator/drain_test.go b/cluster-autoscaler/simulator/drain_test.go index 32f4d64caadc..5e4611a9e104 100644 --- a/cluster-autoscaler/simulator/drain_test.go +++ b/cluster-autoscaler/simulator/drain_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/kubernetes/pkg/kubelet/types" @@ -306,16 +307,14 @@ func TestGetPodsToMove(t *testing.T) { } for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - deleteOptions := NodeDeleteOptions{ + deleteOptions := options.NodeDeleteOptions{ SkipNodesWithSystemPods: true, SkipNodesWithLocalStorage: true, - MinReplicaCount: 0, SkipNodesWithCustomControllerPods: true, - DrainabilityRules: tc.rules, } tracker := pdb.NewBasicRemainingPdbTracker() tracker.SetPdbs(tc.pdbs) - p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, nil, tracker, testTime) + p, d, b, err := GetPodsToMove(schedulerframework.NewNodeInfo(tc.pods...), deleteOptions, tc.rules, nil, tracker, testTime) if tc.wantErr { assert.Error(t, err) } else { diff --git a/cluster-autoscaler/simulator/drainability/context.go b/cluster-autoscaler/simulator/drainability/context.go index c2a341a84f8d..84a4ec4c454f 100644 --- a/cluster-autoscaler/simulator/drainability/context.go +++ b/cluster-autoscaler/simulator/drainability/context.go @@ -18,9 +18,11 @@ package drainability import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" + "k8s.io/autoscaler/cluster-autoscaler/simulator/options" ) // DrainContext contains parameters for drainability rules. type DrainContext struct { RemainingPdbTracker pdb.RemainingPdbTracker + DeleteOptions options.NodeDeleteOptions } diff --git a/cluster-autoscaler/simulator/options/nodedelete.go b/cluster-autoscaler/simulator/options/nodedelete.go new file mode 100644 index 000000000000..947095d6eb78 --- /dev/null +++ b/cluster-autoscaler/simulator/options/nodedelete.go @@ -0,0 +1,48 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "k8s.io/autoscaler/cluster-autoscaler/config" +) + +// NodeDeleteOptions contains various options to customize how draining will behave +type NodeDeleteOptions struct { + // SkipNodesWithSystemPods is true if nodes with kube-system pods should be + // deleted (except for DaemonSet or mirror pods). + SkipNodesWithSystemPods bool + // SkipNodesWithLocalStorage is true if nodes with pods using local storage + // (e.g. EmptyDir or HostPath) should be deleted. + SkipNodesWithLocalStorage bool + // SkipNodesWithCustomControllerPods is true if nodes with + // custom-controller-owned pods should be skipped. + SkipNodesWithCustomControllerPods bool + // MinReplicaCount determines the minimum number of replicas that a replica + // set or replication controller should have to allow pod deletion during + // scale down. + MinReplicaCount int +} + +// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options. +func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions { + return NodeDeleteOptions{ + SkipNodesWithSystemPods: opts.SkipNodesWithSystemPods, + SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage, + MinReplicaCount: opts.MinReplicaCount, + SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods, + } +} From 9ea5a36dc988a04c0c2b7feed3853ecaca4a3d48 Mon Sep 17 00:00:00 2001 From: Artem Minyaylov Date: Fri, 29 Sep 2023 17:47:46 +0000 Subject: [PATCH 3/3] Replace old pdb check with remainingPdbTracker --- cluster-autoscaler/simulator/drain.go | 27 +++------------------------ 1 file changed, 3 insertions(+), 24 deletions(-) diff --git a/cluster-autoscaler/simulator/drain.go b/cluster-autoscaler/simulator/drain.go index a5d37bbc397a..7d8e5c449655 100644 --- a/cluster-autoscaler/simulator/drain.go +++ b/cluster-autoscaler/simulator/drain.go @@ -21,9 +21,6 @@ import ( "time" apiv1 "k8s.io/api/core/v1" - policyv1 "k8s.io/api/policy/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability" "k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules" @@ -90,28 +87,10 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions options. if err != nil { return pods, daemonSetPods, blockingPod, err } - if pdbBlockingPod, err := checkPdbs(pods, remainingPdbTracker.GetPdbs()); err != nil { - return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err + if canRemove, _, blockingPodInfo := remainingPdbTracker.CanRemovePods(pods); !canRemove { + pod := blockingPodInfo.Pod + return []*apiv1.Pod{}, []*apiv1.Pod{}, blockingPodInfo, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name) } return pods, daemonSetPods, nil, nil } - -func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain.BlockingPod, error) { - // TODO: remove it after deprecating legacy scale down. - // RemainingPdbTracker.CanRemovePods() to replace this function. - for _, pdb := range pdbs { - selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) - if err != nil { - return nil, err - } - for _, pod := range pods { - if pod.Namespace == pdb.Namespace && selector.Matches(labels.Set(pod.Labels)) { - if pdb.Status.DisruptionsAllowed < 1 { - return &drain.BlockingPod{Pod: pod, Reason: drain.NotEnoughPdb}, fmt.Errorf("not enough pod disruption budget to move %s/%s", pod.Namespace, pod.Name) - } - } - } - } - return nil, nil -}