Skip to content

Commit

Permalink
Refactor NodeDeleteOptions for use in drainability rules
Browse files Browse the repository at this point in the history
  • Loading branch information
artemvmin committed Sep 28, 2023
1 parent 53e5b5f commit 5a2f46e
Show file tree
Hide file tree
Showing 18 changed files with 164 additions and 111 deletions.
10 changes: 7 additions & 3 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/factory"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand All @@ -57,7 +58,8 @@ type AutoscalerOptions struct {
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
ScaleUpOrchestrator scaleup.Orchestrator
DeleteOptions simulator.NodeDeleteOptions
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -90,7 +92,9 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
opts.DebuggingSnapshotter,
opts.RemainingPdbTracker,
opts.ScaleUpOrchestrator,
opts.DeleteOptions), nil
opts.DeleteOptions,
opts.DrainabilityRules,
), nil
}

// Initialize default options if not provided.
Expand Down
12 changes: 8 additions & 4 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand All @@ -47,7 +49,8 @@ type Actuator struct {
clusterState *clusterstate.ClusterStateRegistry
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
nodeDeletionScheduler *GroupDeletionScheduler
deleteOptions simulator.NodeDeleteOptions
deleteOptions options.NodeDeleteOptions
drainabilityRules rules.Rules
// TODO: Move budget processor to scaledown planner, potentially merge into PostFilteringScaleDownNodeProcessor
// This is a larger change to the code structure which impacts some existing actuator unit tests
// as well as Cluster Autoscaler implementations that may override ScaleDownSetProcessor
Expand All @@ -64,15 +67,16 @@ type actuatorNodeGroupConfigGetter interface {
}

// NewActuator returns a new instance of Actuator.
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions, configGetter actuatorNodeGroupConfigGetter) *Actuator {
func NewActuator(ctx *context.AutoscalingContext, csr *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, configGetter actuatorNodeGroupConfigGetter) *Actuator {
ndb := NewNodeDeletionBatcher(ctx, csr, ndt, ctx.NodeDeletionBatcherInterval)
return &Actuator{
ctx: ctx,
clusterState: csr,
nodeDeletionTracker: ndt,
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, ndt)),
nodeDeletionScheduler: NewGroupDeletionScheduler(ctx, ndt, ndb, NewDefaultEvictor(deleteOptions, drainabilityRules, ndt)),
budgetProcessor: budgets.NewScaleDownBudgetProcessor(ctx),
deleteOptions: deleteOptions,
drainabilityRules: drainabilityRules,
configGetter: configGetter,
nodeDeleteDelayAfterTaint: ctx.NodeDeleteDelayAfterTaint,
}
Expand Down Expand Up @@ -273,7 +277,7 @@ func (a *Actuator) deleteNodesAsync(nodes []*apiv1.Node, nodeGroup cloudprovider
continue
}

podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, registry, remainingPdbTracker, time.Now())
podsToRemove, _, _, err := simulator.GetPodsToMove(nodeInfo, a.deleteOptions, a.drainabilityRules, registry, remainingPdbTracker, time.Now())
if err != nil {
klog.Errorf("Scale-down: couldn't delete node %q, err: %v", node.Name, err)
nodeDeleteResult := status.NodeDeleteResult{ResultType: status.NodeDeleteErrorInternal, Err: errors.NewAutoscalerError(errors.InternalError, "GetPodsToMove for %q returned error: %v", node.Name, err)}
Expand Down
10 changes: 7 additions & 3 deletions cluster-autoscaler/core/scaledown/actuation/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod"
Expand Down Expand Up @@ -61,18 +63,20 @@ type Evictor struct {
DsEvictionEmptyNodeTimeout time.Duration
PodEvictionHeadroom time.Duration
evictionRegister evictionRegister
deleteOptions simulator.NodeDeleteOptions
deleteOptions options.NodeDeleteOptions
drainabilityRules rules.Rules
}

// NewDefaultEvictor returns an instance of Evictor using the default parameters.
func NewDefaultEvictor(deleteOptions simulator.NodeDeleteOptions, evictionRegister evictionRegister) Evictor {
func NewDefaultEvictor(deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules, evictionRegister evictionRegister) Evictor {
return Evictor{
EvictionRetryTime: DefaultEvictionRetryTime,
DsEvictionRetryTime: DefaultDsEvictionRetryTime,
DsEvictionEmptyNodeTimeout: DefaultDsEvictionEmptyNodeTimeout,
PodEvictionHeadroom: DefaultPodEvictionHeadroom,
evictionRegister: evictionRegister,
deleteOptions: deleteOptions,
drainabilityRules: drainabilityRules,
}
}

Expand Down Expand Up @@ -176,7 +180,7 @@ func (e Evictor) DrainNodeWithPods(ctx *acontext.AutoscalingContext, node *apiv1
// EvictDaemonSetPods creates eviction objects for all DaemonSet pods on the node.
func (e Evictor) EvictDaemonSetPods(ctx *acontext.AutoscalingContext, nodeInfo *framework.NodeInfo, timeNow time.Time) error {
nodeToDelete := nodeInfo.Node()
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, nil, nil, timeNow)
_, daemonSetPods, _, err := simulator.GetPodsToMove(nodeInfo, e.deleteOptions, e.drainabilityRules, nil, nil, timeNow)
if err != nil {
return fmt.Errorf("failed to get DaemonSet pods for %s (error: %v)", nodeToDelete.Name, err)
}
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"

Expand All @@ -55,9 +57,9 @@ type ScaleDown struct {
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, ndt *deletiontracker.NodeDeletionTracker, deleteOptions simulator.NodeDeleteOptions) *ScaleDown {
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, ndt *deletiontracker.NodeDeletionTracker, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, false)
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, deleteOptions, drainabilityRules, false)
unremovableNodes := unremovable.NewNodes()
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
return &ScaleDown{
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
autoscaler_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -1287,14 +1288,13 @@ func newWrapperForTesting(ctx *context.AutoscalingContext, clusterStateRegistry
if ndt == nil {
ndt = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
}
deleteOptions := simulator.NodeDeleteOptions{
deleteOptions := options.NodeDeleteOptions{
SkipNodesWithSystemPods: true,
SkipNodesWithLocalStorage: true,
MinReplicaCount: 0,
SkipNodesWithCustomControllerPods: true,
}
processors := NewTestProcessors(ctx)
sd := NewScaleDown(ctx, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor)
sd := NewScaleDown(ctx, processors, ndt, deleteOptions, nil)
actuator := actuation.NewActuator(ctx, clusterStateRegistry, ndt, deleteOptions, nil, processors.NodeGroupConfigProcessor)
return NewScaleDownWrapper(sd, actuator)
}
6 changes: 4 additions & 2 deletions cluster-autoscaler/core/scaledown/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/nodes"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand Down Expand Up @@ -77,7 +79,7 @@ type Planner struct {
}

// New creates a new Planner object.
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions simulator.NodeDeleteOptions) *Planner {
func New(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, deleteOptions options.NodeDeleteOptions, drainabilityRules rules.Rules) *Planner {
resourceLimitsFinder := resource.NewLimitsFinder(processors.CustomResourcesProcessor)
minUpdateInterval := context.AutoscalingOptions.NodeGroupDefaults.ScaleDownUnneededTime
if minUpdateInterval == 0*time.Nanosecond {
Expand All @@ -87,7 +89,7 @@ func New(context *context.AutoscalingContext, processors *processors.Autoscaling
context: context,
unremovableNodes: unremovable.NewNodes(),
unneededNodes: unneeded.NewNodes(processors.NodeGroupConfigProcessor, resourceLimitsFinder),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, true),
rs: simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, simulator.NewUsageTracker(), deleteOptions, drainabilityRules, true),
actuationInjector: scheduling.NewHintingSimulator(context.PredicateChecker),
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
nodeUtilizationMap: make(map[string]utilization.Info),
Expand Down
13 changes: 7 additions & 6 deletions cluster-autoscaler/core/scaledown/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -492,8 +493,8 @@ func TestUpdateClusterState(t *testing.T) {
}, &fake.Clientset{}, registry, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, tc.nodes, tc.pods)
deleteOptions := simulator.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(tc.eligible)}
if tc.isSimulationTimeout {
context.AutoscalingOptions.ScaleDownSimulationTimeout = 1 * time.Second
Expand Down Expand Up @@ -611,8 +612,8 @@ func TestUpdateClusterStatUnneededNodesLimit(t *testing.T) {
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, nodes, nil)
deleteOptions := simulator.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p.eligibilityChecker = &fakeEligibilityChecker{eligible: asMap(nodeNames(nodes))}
p.minUpdateInterval = tc.updateInterval
p.unneededNodes.Update(previouslyUnneeded, time.Now())
Expand Down Expand Up @@ -779,8 +780,8 @@ func TestNodesToDelete(t *testing.T) {
}, &fake.Clientset{}, nil, provider, nil, nil)
assert.NoError(t, err)
clustersnapshot.InitializeClusterSnapshotOrDie(t, context.ClusterSnapshot, allNodes, nil)
deleteOptions := simulator.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions)
deleteOptions := options.NodeDeleteOptions{}
p := New(&context, NewTestProcessors(&context), deleteOptions, nil)
p.latestUpdate = time.Now()
p.actuationStatus = deletiontracker.NewNodeDeletionTracker(0 * time.Second)
p.unneededNodes.Update(allRemovables, time.Now().Add(-1*time.Hour))
Expand Down
11 changes: 7 additions & 4 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
Expand Down Expand Up @@ -142,7 +144,8 @@ func NewStaticAutoscaler(
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter,
remainingPdbTracker pdb.RemainingPdbTracker,
scaleUpOrchestrator scaleup.Orchestrator,
deleteOptions simulator.NodeDeleteOptions) *StaticAutoscaler {
deleteOptions options.NodeDeleteOptions,
drainabilityRules rules.Rules) *StaticAutoscaler {

clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: opts.MaxTotalUnreadyPercentage,
Expand All @@ -169,14 +172,14 @@ func NewStaticAutoscaler(
// TODO: Populate the ScaleDownActuator/Planner fields in AutoscalingContext
// during the struct creation rather than here.
ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, processors.NodeGroupConfigProcessor)
scaleDown := legacy.NewScaleDown(autoscalingContext, processors, ndt, deleteOptions, drainabilityRules)
actuator := actuation.NewActuator(autoscalingContext, clusterStateRegistry, ndt, deleteOptions, drainabilityRules, processors.NodeGroupConfigProcessor)
autoscalingContext.ScaleDownActuator = actuator

var scaleDownPlanner scaledown.Planner
var scaleDownActuator scaledown.Actuator
if opts.ParallelDrain {
scaleDownPlanner = planner.New(autoscalingContext, processors, deleteOptions)
scaleDownPlanner = planner.New(autoscalingContext, processors, deleteOptions, drainabilityRules)
scaleDownActuator = actuator
} else {
// TODO: Remove the wrapper once the legacy implementation becomes obsolete.
Expand Down
Loading

0 comments on commit 5a2f46e

Please sign in to comment.