Skip to content

Commit

Permalink
Extract drainability rules into packages
Browse files Browse the repository at this point in the history
  • Loading branch information
artemvmin committed Sep 28, 2023
1 parent fbe25e1 commit 53e5b5f
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 91 deletions.
11 changes: 6 additions & 5 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/klog/v2"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
Expand All @@ -30,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/budgets"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
Expand Down Expand Up @@ -223,7 +223,7 @@ func (a *Actuator) deleteAsyncDrain(NodeGroupViews []*budgets.NodeGroupView) (re
}

func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider.NodeGroup, drain bool, batchSize int) {
var pdbs []*policyv1.PodDisruptionBudget
var remainingPdbTracker pdb.RemainingPdbTracker
var registry kube_util.ListerRegistry

if len(nodes) == 0 {
Expand All @@ -246,7 +246,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
}

if drain {
pdbs, err = a.ctx.PodDisruptionBudgetLister().List()
pdbs, err := a.ctx.PodDisruptionBudgetLister().List()
if err != nil {
klog.Errorf("Scale-down: couldn't fetch pod disruption budgets, err: %v", err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "podDisruptionBudgetLister.List returned error %v", err)}
Expand All @@ -255,7 +255,8 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
}
return
}

remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
remainingPdbTracker.SetPdbs(pdbs)
registry = a.ctx.ListerRegistry
}

Expand All @@ -272,7 +273,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
continue
}

podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, pdbs, time.Now())
podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, remainingPdbTracker, time.Now())
if err != nil {
klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)}
Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/core/scaledown/actuation/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
kube_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -177,7 +176,7 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
// EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node.
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error {
nodeToDelete := nodeInfo.Node()
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, []*policyv1.PodDisruptionBudget{}, timeNow)
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, nil, timeNow)
if err != nil {
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
currentCandidates,
destinations,
timestamp,
sd.context.RemainingPdbTracker.GetPdbs())
sd.context.RemainingPdbTracker)

additionalCandidatesCount := sd.context.ScaleDownNonEmptyCandidatesCount - len(nodesToRemove)
if additionalCandidatesCount > len(currentNonCandidates) {
Expand All @@ -169,7 +169,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
currentNonCandidates[:additionalCandidatesPoolSize],
destinations,
timestamp,
sd.context.RemainingPdbTracker.GetPdbs())
sd.context.RemainingPdbTracker)
if len(additionalNodesToRemove) > additionalCandidatesCount {
additionalNodesToRemove = additionalNodesToRemove[:additionalCandidatesCount]
}
Expand Down Expand Up @@ -317,7 +317,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time) (_, drain []*apiv1.Nod
candidateNames,
allNodeNames,
time.Now(),
sd.context.RemainingPdbTracker.GetPdbs())
sd.context.RemainingPdbTracker)
findNodesToRemoveDuration = time.Now().Sub(findNodesToRemoveStart)

for _, unremovableNode := range unremovable {
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unneeded"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
Expand All @@ -47,7 +47,7 @@ type eligibilityChecker interface {

type removalSimulator interface {
DropOldHints()
SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, pdbs []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode)
SimulateNodeRemoval(node string, podDestinations map[string]bool, timestamp time.Time, remainingPdbTracker pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode)
}

// controllerReplicasCalculator calculates a number of target and expected replicas for a given controller.
Expand Down Expand Up @@ -276,7 +276,7 @@ func (p *Planner) categorizeNodes(podDestinations map[string]bool, scaleDownCand
klog.V(4).Infof("%d out of %d nodes skipped in scale down simulation: there are already %d unneeded nodes so no point in looking for more.", len(currentlyUnneededNodeNames)-i, len(currentlyUnneededNodeNames), len(removableList))
break
}
removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker.GetPdbs())
removable, unremovable := p.rs.SimulateNodeRemoval(node, podDestinations, p.latestUpdate, p.context.RemainingPdbTracker)
if removable != nil {
_, inParallel, _ := p.context.RemainingPdbTracker.CanRemovePods(removable.PodsToReschedule)
if !inParallel {
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
Expand Down Expand Up @@ -901,7 +901,7 @@ type fakeRemovalSimulator struct {

func (r *fakeRemovalSimulator) DropOldHints() {}

func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ []*policyv1.PodDisruptionBudget) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) {
func (r *fakeRemovalSimulator) SimulateNodeRemoval(name string, _ map[string]bool, _ time.Time, _ pdb.RemainingPdbTracker) (*simulator.NodeToBeRemoved, *simulator.UnremovableNode) {
time.Sleep(r.sleep)
node := &apiv1.Node{}
for _, n := range r.nodes {
Expand Down
10 changes: 5 additions & 5 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
Expand All @@ -29,7 +30,6 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"

klog "k8s.io/klog/v2"
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (r *RemovalSimulator) FindNodesToRemove(
candidates []string,
destinations []string,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget,
remainingPdbTracker pdb.RemainingPdbTracker,
) (nodesToRemove []NodeToBeRemoved, unremovableNodes []*UnremovableNode) {
result := make([]NodeToBeRemoved, 0)
unremovable := make([]*UnremovableNode, 0)
Expand All @@ -128,7 +128,7 @@ func (r *RemovalSimulator) FindNodesToRemove(
}

for _, nodeName := range candidates {
rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, pdbs)
rn, urn := r.SimulateNodeRemoval(nodeName, destinationMap, timestamp, remainingPdbTracker)
if rn != nil {
result = append(result, *rn)
} else if urn != nil {
Expand All @@ -146,7 +146,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval(
nodeName string,
destinationMap map[string]bool,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget,
remainingPdbTracker pdb.RemainingPdbTracker,
) (*NodeToBeRemoved, *UnremovableNode) {
nodeInfo, err := r.clusterSnapshot.NodeInfos().Get(nodeName)
if err != nil {
Expand All @@ -159,7 +159,7 @@ func (r *RemovalSimulator) SimulateNodeRemoval(
return nil, &UnremovableNode{Node: nodeInfo.Node(), Reason: UnexpectedError}
}

podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, pdbs, timestamp)
podsToRemove, daemonSetPods, blockingPod, err := GetPodsToMove(nodeInfo, r.deleteOptions, r.listers, remainingPdbTracker, timestamp)
if err != nil {
klog.V(2).Infof("node %s cannot be removed: %v", nodeName, err)
if blockingPod != nil {
Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/simulator/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/types"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -207,7 +206,7 @@ func TestFindNodesToRemove(t *testing.T) {
}
clustersnapshot.InitializeClusterSnapshotOrDie(t, clusterSnapshot, test.allNodes, test.pods)
r := NewRemovalSimulator(registry, clusterSnapshot, predicateChecker, tracker, testDeleteOptions(), false)
toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), []*policyv1.PodDisruptionBudget{})
toRemove, unremovable := r.FindNodesToRemove(test.candidates, destinations, time.Now(), nil)
fmt.Printf("Test scenario: %s, found len(toRemove)=%v, expected len(test.toRemove)=%v\n", test.name, len(toRemove), len(test.toRemove))
assert.Equal(t, toRemove, test.toRemove)
assert.Equal(t, unremovable, test.unremovable)
Expand Down
44 changes: 22 additions & 22 deletions cluster-autoscaler/simulator/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/pdb"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
Expand All @@ -44,7 +46,7 @@ type NodeDeleteOptions struct {
// to allow their pods deletion in scale down
MinReplicaCount int
// DrainabilityRules contain a list of checks that are used to verify whether a pod can be drained from node.
DrainabilityRules []drainability.Rule
DrainabilityRules rules.Rules
}

// NewNodeDeleteOptions returns new node delete options extracted from autoscaling options
Expand All @@ -54,7 +56,7 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
SkipNodesWithLocalStorage: opts.SkipNodesWithLocalStorage,
MinReplicaCount: opts.MinReplicaCount,
SkipNodesWithCustomControllerPods: opts.SkipNodesWithCustomControllerPods,
DrainabilityRules: drainability.DefaultRules(),
DrainabilityRules: rules.Default(),
}
}

Expand All @@ -67,16 +69,22 @@ func NewNodeDeleteOptions(opts config.AutoscalingOptions) NodeDeleteOptions {
// still exist.
// TODO(x13n): Rewrite GetPodsForDeletionOnNodeDrain into a set of DrainabilityRules.
func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDeleteOptions, listers kube_util.ListerRegistry,
pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
remainingPdbTracker pdb.RemainingPdbTracker, timestamp time.Time) (pods []*apiv1.Pod, daemonSetPods []*apiv1.Pod, blockingPod *drain.BlockingPod, err error) {
var drainPods, drainDs []*apiv1.Pod
drainabilityRules := deleteOptions.DrainabilityRules
if drainabilityRules == nil {
drainabilityRules = drainability.DefaultRules()
drainabilityRules = rules.Default()
}
if remainingPdbTracker == nil {
remainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
}
drainCtx := &drainability.DrainContext{
RemainingPdbTracker: remainingPdbTracker,
}
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
d := drainabilityStatus(pod, drainabilityRules)
switch d.Outcome {
status := drainabilityRules.Drainable(drainCtx, pod)
switch status.Outcome {
case drainability.UndefinedOutcome:
pods = append(pods, podInfo.Pod)
case drainability.DrainOk:
Expand All @@ -86,15 +94,18 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
drainPods = append(drainPods, pod)
}
case drainability.BlockDrain:
blockingPod = &drain.BlockingPod{pod, d.BlockingReason}
err = d.Error
blockingPod = &drain.BlockingPod{
Pod: pod,
Reason: status.BlockingReason,
}
err = status.Error
return
case drainability.SkipDrain:
}
}

pods, daemonSetPods, blockingPod, err = drain.GetPodsForDeletionOnNodeDrain(
pods,
pdbs,
remainingPdbTracker.GetPdbs(),
deleteOptions.SkipNodesWithSystemPods,
deleteOptions.SkipNodesWithLocalStorage,
deleteOptions.SkipNodesWithCustomControllerPods,
Expand All @@ -106,7 +117,7 @@ func GetPodsToMove(nodeInfo *schedulerframework.NodeInfo, deleteOptions NodeDele
if err != nil {
return pods, daemonSetPods, blockingPod, err
}
if pdbBlockingPod, err := checkPdbs(pods, pdbs); err != nil {
if pdbBlockingPod, err := checkPdbs(pods, remainingPdbTracker.GetPdbs()); err != nil {
return []*apiv1.Pod{}, []*apiv1.Pod{}, pdbBlockingPod, err
}

Expand All @@ -131,14 +142,3 @@ func checkPdbs(pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (*drain.
}
return nil, nil
}

func drainabilityStatus(pod *apiv1.Pod, dr []drainability.Rule) drainability.Status {
for _, f := range dr {
if d := f.Drainable(pod); d.Outcome != drainability.UndefinedOutcome {
return d
}
}
return drainability.Status{
Outcome: drainability.UndefinedOutcome,
}
}
Loading

0 comments on commit 53e5b5f

Please sign in to comment.