From f0216e7f592a147da2d2b9c948f8b8c5731b6d6c Mon Sep 17 00:00:00 2001 From: Maksym Fuhol Date: Tue, 27 Feb 2024 12:51:37 +0000 Subject: [PATCH] Refactor StartDeletion usage patterns and enforce periodic scaledown status processor calls. --- .../currently_drained_nodes_test.go | 8 +- .../core/scaledown/actuation/actuator.go | 28 +- .../core/scaledown/actuation/actuator_test.go | 32 +- .../deletiontracker/nodedeletiontracker.go | 3 +- .../core/scaledown/legacy/legacy_test.go | 20 +- .../core/scaledown/legacy/wrapper.go | 20 +- .../core/scaledown/scaledown.go | 5 +- cluster-autoscaler/core/static_autoscaler.go | 31 +- .../core/static_autoscaler_test.go | 278 +++++++++++++++++- 9 files changed, 341 insertions(+), 84 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go b/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go index 7b01fffd920..678afbd0570 100644 --- a/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go +++ b/cluster-autoscaler/core/podlistprocessor/currently_drained_nodes_test.go @@ -269,8 +269,8 @@ type mockActuator struct { status *mockActuationStatus } -func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { - return nil, nil +func (m *mockActuator) StartDeletion(_, _ []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { + return status.ScaleDownError, []*status.ScaleDownNode{}, nil } func (m *mockActuator) CheckStatus() scaledown.ActuationStatus { @@ -281,6 +281,10 @@ func (m *mockActuator) ClearResultsNotNewerThan(time.Time) { } +func (m *mockActuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return map[string]status.NodeDeleteResult{}, time.Now() +} + type mockActuationStatus struct { drainedNodes []string } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index f5854385b0e..3fc56064d8e 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -96,47 +96,47 @@ func (a *Actuator) ClearResultsNotNewerThan(t time.Time) { a.nodeDeletionTracker.ClearResultsNotNewerThan(t) } +// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call +// in a map form, along with the timestamp of last result. +func (a *Actuator) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return a.nodeDeletionTracker.DeletionResults() +} + // StartDeletion triggers a new deletion process. -func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { +func (a *Actuator) StartDeletion(empty, drain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { a.nodeDeletionScheduler.ResetAndReportMetrics() deletionStartTime := time.Now() defer func() { metrics.UpdateDuration(metrics.ScaleDownNodeDeletion, time.Since(deletionStartTime)) }() - results, ts := a.nodeDeletionTracker.DeletionResults() - scaleDownStatus := &status.ScaleDownStatus{NodeDeleteResults: results, NodeDeleteResultsAsOf: ts} - + scaledDownNodes := make([]*status.ScaleDownNode, 0) emptyToDelete, drainToDelete := a.budgetProcessor.CropNodes(a.nodeDeletionTracker, empty, drain) if len(emptyToDelete) == 0 && len(drainToDelete) == 0 { - scaleDownStatus.Result = status.ScaleDownNoNodeDeleted - return scaleDownStatus, nil + return status.ScaleDownNoNodeDeleted, nil, nil } if len(emptyToDelete) > 0 { // Taint all empty nodes synchronously if err := a.taintNodesSync(emptyToDelete); err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err + return status.ScaleDownError, scaledDownNodes, err } emptyScaledDown := a.deleteAsyncEmpty(emptyToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, emptyScaledDown...) + scaledDownNodes = append(scaledDownNodes, emptyScaledDown...) } if len(drainToDelete) > 0 { // Taint all nodes that need drain synchronously, but don't start any drain/deletion yet. Otherwise, pods evicted from one to-be-deleted node // could get recreated on another. if err := a.taintNodesSync(drainToDelete); err != nil { - scaleDownStatus.Result = status.ScaleDownError - return scaleDownStatus, err + return status.ScaleDownError, scaledDownNodes, err } // All nodes involved in the scale-down should be tainted now - start draining and deleting nodes asynchronously. drainScaledDown := a.deleteAsyncDrain(drainToDelete) - scaleDownStatus.ScaledDownNodes = append(scaleDownStatus.ScaledDownNodes, drainScaledDown...) + scaledDownNodes = append(scaledDownNodes, drainScaledDown...) } - scaleDownStatus.Result = status.ScaleDownNodeDeleteStarted - return scaleDownStatus, nil + return status.ScaleDownNodeDeleteStarted, scaledDownNodes, nil } // deleteAsyncEmpty immediately starts deletions asynchronously. diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 3f468ceb4a2..81a1abd2df1 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -1174,9 +1174,7 @@ func TestStartDeletion(t *testing.T) { } } - wantScaleDownStatus := &status.ScaleDownStatus{ - Result: tc.wantStatus.result, - } + wantScaleDownNodes := []*status.ScaleDownNode{} for _, scaleDownNodeInfo := range tc.wantStatus.scaledDownNodes { statusScaledDownNode := &status.ScaleDownNode{ Node: generateNode(scaleDownNodeInfo.name), @@ -1184,7 +1182,7 @@ func TestStartDeletion(t *testing.T) { EvictedPods: scaleDownNodeInfo.evictedPods, UtilInfo: scaleDownNodeInfo.utilInfo, } - wantScaleDownStatus.ScaledDownNodes = append(wantScaleDownStatus.ScaledDownNodes, statusScaledDownNode) + wantScaleDownNodes = append(wantScaleDownNodes, statusScaledDownNode) } scaleStateNotifier := nodegroupchange.NewNodeGroupChangeObserversList() @@ -1201,18 +1199,22 @@ func TestStartDeletion(t *testing.T) { budgetProcessor: budgets.NewScaleDownBudgetProcessor(&ctx), configGetter: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(ctx.NodeGroupDefaults), } - gotStatus, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes) + gotResult, gotScaleDownNodes, gotErr := actuator.StartDeletion(allEmptyNodes, allDrainNodes) if diff := cmp.Diff(tc.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" { t.Errorf("StartDeletion error diff (-want +got):\n%s", diff) } - // Verify ScaleDownStatus looks as expected. + // Verify ScaleDownResult looks as expected. + if diff := cmp.Diff(tc.wantStatus.result, gotResult); diff != "" { + t.Errorf("StartDeletion result diff (-want +got):\n%s", diff) + } + + // Verify ScaleDownNodes looks as expected. ignoreSdNodeOrder := cmpopts.SortSlices(func(a, b *status.ScaleDownNode) bool { return a.Node.Name < b.Node.Name }) - ignoreTimestamps := cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf") cmpNg := cmp.Comparer(func(a, b *testprovider.TestNodeGroup) bool { return a.Id() == b.Id() }) - statusCmpOpts := cmp.Options{ignoreSdNodeOrder, ignoreTimestamps, cmpNg, cmpopts.EquateEmpty()} - if diff := cmp.Diff(wantScaleDownStatus, gotStatus, statusCmpOpts); diff != "" { - t.Errorf("StartDeletion status diff (-want +got):\n%s", diff) + statusCmpOpts := cmp.Options{ignoreSdNodeOrder, cmpNg, cmpopts.EquateEmpty()} + if diff := cmp.Diff(wantScaleDownNodes, gotScaleDownNodes, statusCmpOpts); diff != "" { + t.Errorf("StartDeletion scaled down nodes diff (-want +got):\n%s", diff) } // Verify that all expected nodes were deleted using the cloud provider hook. @@ -1278,13 +1280,9 @@ func TestStartDeletion(t *testing.T) { t.Errorf("Timeout while waiting for node deletion results") } - // Run StartDeletion again to gather node deletion results for deletions started in the previous call, and verify - // that they look as expected. - gotNextStatus, gotNextErr := actuator.StartDeletion(nil, nil) - if gotNextErr != nil { - t.Errorf("StartDeletion unexpected error: %v", gotNextErr) - } - if diff := cmp.Diff(tc.wantNodeDeleteResults, gotNextStatus.NodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" { + // Gather node deletion results for deletions started in the previous call, and verify that they look as expected. + nodeDeleteResults, _ := actuator.DeletionResults() + if diff := cmp.Diff(tc.wantNodeDeleteResults, nodeDeleteResults, cmpopts.EquateEmpty(), cmpopts.EquateErrors()); diff != "" { t.Errorf("NodeDeleteResults diff (-want +got):\n%s", diff) } }) diff --git a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go index d3d5a53b440..543f1cf1cd3 100644 --- a/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go +++ b/cluster-autoscaler/core/scaledown/deletiontracker/nodedeletiontracker.go @@ -148,7 +148,8 @@ func (n *NodeDeletionTracker) DeletionsCount(nodeGroupId string) int { return n.deletionsPerNodeGroup[nodeGroupId] } -// DeletionResults returns deletion results in a map form, along with the timestamp of last result. +// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call +// in a map form, along with the timestamp of last result. func (n *NodeDeletionTracker) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { n.Lock() defer n.Unlock() diff --git a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go index 1ba61e250df..84eb653c234 100644 --- a/cluster-autoscaler/core/scaledown/legacy/legacy_test.go +++ b/cluster-autoscaler/core/scaledown/legacy/legacy_test.go @@ -779,10 +779,10 @@ func TestScaleDown(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult) assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes)) assert.Equal(t, n1.Name, utils.GetStringFromChan(updatedNodes)) } @@ -1036,7 +1036,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) assert.NoError(t, err) var expectedScaleDownResult status.ScaleDownResult @@ -1045,7 +1045,7 @@ func simpleScaleDownEmpty(t *testing.T, config *ScaleTestConfig) { } else { expectedScaleDownResult = status.ScaleDownNoUnneeded } - assert.Equal(t, expectedScaleDownResult, scaleDownStatus.Result) + assert.Equal(t, expectedScaleDownResult, scaleDownResult) expectedScaleDownCount := config.ExpectedScaleDownCount if config.ExpectedScaleDownCount == 0 { @@ -1131,11 +1131,11 @@ func TestNoScaleDownUnready(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult) deletedNodes := make(chan string, 10) @@ -1155,11 +1155,11 @@ func TestNoScaleDownUnready(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-2*time.Hour)) assert.NoError(t, autoscalererr) empty, drain = wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err = wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err = wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNodeDeleteStarted, scaleDownResult) assert.Equal(t, n1.Name, utils.GetStringFromChan(deletedNodes)) } @@ -1245,11 +1245,11 @@ func TestScaleDownNoMove(t *testing.T) { autoscalererr = wrapper.UpdateClusterState(nodes, nodes, nil, time.Now().Add(-5*time.Minute)) assert.NoError(t, autoscalererr) empty, drain := wrapper.NodesToDelete(time.Now()) - scaleDownStatus, err := wrapper.StartDeletion(empty, drain) + scaleDownResult, _, err := wrapper.StartDeletion(empty, drain) waitForDeleteToFinish(t, wrapper) assert.NoError(t, err) - assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownStatus.Result) + assert.Equal(t, status.ScaleDownNoUnneeded, scaleDownResult) } func getCountOfChan(c chan string) int { diff --git a/cluster-autoscaler/core/scaledown/legacy/wrapper.go b/cluster-autoscaler/core/scaledown/legacy/wrapper.go index d9bbfd12dca..36ccad90ed2 100644 --- a/cluster-autoscaler/core/scaledown/legacy/wrapper.go +++ b/cluster-autoscaler/core/scaledown/legacy/wrapper.go @@ -89,20 +89,12 @@ func (p *ScaleDownWrapper) NodesToDelete(currentTime time.Time) (empty, needDrai } // StartDeletion triggers an actual scale down logic. -func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) { +func (p *ScaleDownWrapper) StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) { // Done to preserve legacy behavior, see comment on NodesToDelete. if p.lastNodesToDeleteErr != nil || p.lastNodesToDeleteResult != status.ScaleDownNodeDeleteStarted { - // When there is no need for scale-down, p.lastNodesToDeleteResult is set to ScaleDownNoUnneeded. We have to still report node delete - // results in this case, otherwise they wouldn't get reported until the next call to actuator.StartDeletion (i.e. until the next scale-down - // attempt). - // Run actuator.StartDeletion with no nodes just to grab the delete results. - origStatus, _ := p.actuator.StartDeletion(nil, nil) - return &status.ScaleDownStatus{ - Result: p.lastNodesToDeleteResult, - NodeDeleteResults: origStatus.NodeDeleteResults, - NodeDeleteResultsAsOf: origStatus.NodeDeleteResultsAsOf, - }, p.lastNodesToDeleteErr + return p.lastNodesToDeleteResult, []*status.ScaleDownNode{}, p.lastNodesToDeleteErr } + return p.actuator.StartDeletion(empty, needDrain) } @@ -116,3 +108,9 @@ func (p *ScaleDownWrapper) CheckStatus() scaledown.ActuationStatus { func (p *ScaleDownWrapper) ClearResultsNotNewerThan(t time.Time) { p.actuator.ClearResultsNotNewerThan(t) } + +// DeletionResults returns deletion results since the last ClearResultsNotNewerThan call +// in a map form, along with the timestamp of last result. +func (p *ScaleDownWrapper) DeletionResults() (map[string]status.NodeDeleteResult, time.Time) { + return p.actuator.DeletionResults() +} diff --git a/cluster-autoscaler/core/scaledown/scaledown.go b/cluster-autoscaler/core/scaledown/scaledown.go index 434ea931299..b73a4361a24 100644 --- a/cluster-autoscaler/core/scaledown/scaledown.go +++ b/cluster-autoscaler/core/scaledown/scaledown.go @@ -56,12 +56,15 @@ type Actuator interface { // function are not guaranteed to be deleted, it is possible for the // Actuator to ignore some of them e.g. if max configured level of // parallelism is reached. - StartDeletion(empty, needDrain []*apiv1.Node) (*status.ScaleDownStatus, errors.AutoscalerError) + StartDeletion(empty, needDrain []*apiv1.Node) (status.ScaleDownResult, []*status.ScaleDownNode, errors.AutoscalerError) // CheckStatus returns an immutable snapshot of ongoing deletions. CheckStatus() ActuationStatus // ClearResultsNotNewerThan removes information about deletions finished // before or exactly at the provided timestamp. ClearResultsNotNewerThan(time.Time) + // DeletionResults returns deletion results since the last ClearResultsNotNewerThan call + // in a map form, along with the timestamp of last result. + DeletionResults() (map[string]status.NodeDeleteResult, time.Time) } // ActuationStatus is used for feeding Actuator status back into Planner diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 262723e1db6..6f056925e7f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -388,7 +388,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr scaleUpStatus := &status.ScaleUpStatus{Result: status.ScaleUpNotTried} scaleUpStatusProcessorAlreadyCalled := false scaleDownStatus := &scaledownstatus.ScaleDownStatus{Result: scaledownstatus.ScaleDownNotTried} - scaleDownStatusProcessorAlreadyCalled := false defer func() { // Update status information when the loop is done (regardless of reason) @@ -403,14 +402,22 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr if !scaleUpStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleUpStatusProcessor != nil { a.processors.ScaleUpStatusProcessor.Process(a.AutoscalingContext, scaleUpStatus) } - if !scaleDownStatusProcessorAlreadyCalled && a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { + if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { + // Gather status before scaledown status processor invocation + nodeDeletionResults, nodeDeletionResultsAsOf := a.scaleDownActuator.DeletionResults() + scaleDownStatus.NodeDeleteResults = nodeDeletionResults + scaleDownStatus.NodeDeleteResultsAsOf = nodeDeletionResultsAsOf + a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf) scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) + a.processors.ScaleDownStatusProcessor.Process(a.AutoscalingContext, scaleDownStatus) } - err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime) - if err != nil { - klog.Errorf("AutoscalingStatusProcessor error: %v.", err) + if a.processors != nil && a.processors.AutoscalingStatusProcessor != nil { + err := a.processors.AutoscalingStatusProcessor.Process(a.AutoscalingContext, a.clusterStateRegistry, currentTime) + if err != nil { + klog.Errorf("AutoscalingStatusProcessor error: %v.", err) + } } }() @@ -637,17 +644,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr if scaleDownInCooldown { scaleDownStatus.Result = scaledownstatus.ScaleDownInCooldown - if len(removedNodeGroups) > 0 { - a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus) - } } else { klog.V(4).Infof("Starting scale down") scaleDownStart := time.Now() metrics.UpdateLastTime(metrics.ScaleDown, scaleDownStart) empty, needDrain := a.scaleDownPlanner.NodesToDelete(currentTime) - scaleDownStatus, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain) - a.scaleDownActuator.ClearResultsNotNewerThan(scaleDownStatus.NodeDeleteResultsAsOf) + scaleDownResult, scaledDownNodes, typedErr := a.scaleDownActuator.StartDeletion(empty, needDrain) + scaleDownStatus.Result = scaleDownResult + scaleDownStatus.ScaledDownNodes = scaledDownNodes metrics.UpdateDurationFromStart(metrics.ScaleDown, scaleDownStart) metrics.UpdateUnremovableNodesCount(countsByReason(a.scaleDownPlanner.UnremovableNodes())) @@ -673,12 +678,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes) } - if a.processors != nil && a.processors.ScaleDownStatusProcessor != nil { - scaleDownStatus.SetUnremovableNodesInfo(a.scaleDownPlanner.UnremovableNodes(), a.scaleDownPlanner.NodeUtilizationMap(), a.CloudProvider) - a.processors.ScaleDownStatusProcessor.Process(autoscalingContext, scaleDownStatus) - scaleDownStatusProcessorAlreadyCalled = true - } - if typedErr != nil { klog.Errorf("Failed to scale down: %v", typedErr) a.lastScaleDownFailTime = currentTime diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 6e31f580393..8635f61199e 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -38,6 +38,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker" "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/legacy" + "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" . "k8s.io/autoscaler/cluster-autoscaler/core/test" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" @@ -70,6 +71,8 @@ import ( v1appslister "k8s.io/client-go/listers/apps/v1" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" klog "k8s.io/klog/v2" @@ -173,12 +176,26 @@ type scaleCall struct { delta int } +type scaleDownStatusProcessorMock struct { + called int + scaleDownStatus *status.ScaleDownStatus +} + +func (p *scaleDownStatusProcessorMock) Process(_ *context.AutoscalingContext, st *status.ScaleDownStatus) { + p.called += 1 + p.scaleDownStatus = st +} + +func (p *scaleDownStatusProcessorMock) CleanUp() { +} + type commonMocks struct { readyNodeLister *kube_util.TestNodeLister allNodeLister *kube_util.TestNodeLister allPodLister *podListerMock podDisruptionBudgetLister *podDisruptionBudgetListerMock daemonSetLister *daemonSetListerMock + nodeDeletionTracker *deletiontracker.NodeDeletionTracker onScaleUp *onScaleUpMock onScaleDown *onScaleDownMock @@ -270,7 +287,7 @@ func setupAutoscaler(config *autoscalerSetupConfig) (*StaticAutoscaler, error) { processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, config.mocks.nodeDeletionTracker) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -363,7 +380,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { } processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -563,7 +580,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) processors.ScaleStateNotifier.Register(clusterState) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -789,7 +806,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { } clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -940,7 +957,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { processors := NewTestProcessors(&context) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1089,7 +1106,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) suOrchestrator := orchestrator.New() suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{}) @@ -1221,7 +1238,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -1320,7 +1337,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * processors := NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) - sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState) + sdPlanner, sdActuator := newScaleDownPlannerAndActuator(&context, processors, clusterState, nil) autoscaler := &StaticAutoscaler{ AutoscalingContext: &context, @@ -2396,6 +2413,240 @@ func TestFilterOutYoungPods(t *testing.T) { } } +func TestStaticAutoscalerRunOnceInvokesScaleDownStatusProcessor(t *testing.T) { + options := config.AutoscalingOptions{ + NodeGroupDefaults: config.NodeGroupAutoscalingOptions{ + ScaleDownUnneededTime: -1 * time.Nanosecond, // enforce immediate scaledown/drain for ready + ScaleDownUnreadyTime: -1 * time.Nanosecond, // enforce immediate scaledown/drain for unready + ScaleDownUtilizationThreshold: 0.5, + MaxNodeProvisionTime: 10 * time.Second, + }, + EstimatorName: estimator.BinpackingEstimatorName, + ScaleDownEnabled: true, + MaxNodesTotal: 10, + MaxCoresTotal: 10, + MaxMemoryTotal: 100000, + } + now := time.Now() + n1 := BuildTestNode("n1", 1000, 1000) + SetNodeReadyState(n1, true, now) + n2 := BuildTestNode("n2", 1000, 1000) + SetNodeReadyState(n2, true, now) + + underUtilizedPod := BuildTestPod("p1", 20, 20, WithNodeName("n1")) + utilizedPod := BuildTestPod("p1", 800, 800, WithNodeName("n1")) + + testCases := map[string]struct { + pods []*apiv1.Pod + nodes []*apiv1.Node + fakeDeletionResults map[string]status.NodeDeleteResult + fakeDeletionResultsNodeGroup string + expectedStatus *status.ScaleDownStatus + }{ + "No scaledown candidates, utilized": { + pods: []*apiv1.Pod{utilizedPod}, + nodes: []*apiv1.Node{n1}, + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNoUnneeded, + ScaledDownNodes: []*status.ScaleDownNode{}, + UnremovableNodes: []*status.UnremovableNode{ + { + Node: n1, + BlockingPod: nil, + Reason: simulator.NotUnderutilized, + }, + }, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + "No scaledown candidates, underutilized, not replicated": { + pods: []*apiv1.Pod{underUtilizedPod}, + nodes: []*apiv1.Node{n1}, + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNoUnneeded, + ScaledDownNodes: []*status.ScaleDownNode{}, + UnremovableNodes: []*status.UnremovableNode{ + { + Node: n1, + Reason: simulator.BlockedByPod, + BlockingPod: &drain.BlockingPod{ + Pod: underUtilizedPod, + Reason: drain.NotReplicated, + }, + }, + }, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + "With scaledown candidate, empty node and utilized node": { + pods: []*apiv1.Pod{underUtilizedPod}, + nodes: []*apiv1.Node{n1, n2}, + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNodeDeleteStarted, + ScaledDownNodes: []*status.ScaleDownNode{ + { + Node: n2, + }, + }, + UnremovableNodes: []*status.UnremovableNode{ + { + Node: n1, + Reason: simulator.BlockedByPod, + BlockingPod: &drain.BlockingPod{ + Pod: underUtilizedPod, + Reason: drain.NotReplicated, + }, + }, + }, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + "With scaledown candidate, empty node": { + pods: []*apiv1.Pod{}, + nodes: []*apiv1.Node{n2}, + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNodeDeleteStarted, + ScaledDownNodes: []*status.ScaleDownNode{ + { + Node: n2, + }, + }, + UnremovableNodes: []*status.UnremovableNode{}, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + "No scaledown candidates, deletion success": { + pods: []*apiv1.Pod{utilizedPod}, + nodes: []*apiv1.Node{n1}, + fakeDeletionResults: map[string]status.NodeDeleteResult{"n1": { + Err: nil, + ResultType: status.NodeDeleteOk, + }}, + fakeDeletionResultsNodeGroup: "ng1", + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNoUnneeded, + ScaledDownNodes: []*status.ScaleDownNode{}, + UnremovableNodes: []*status.UnremovableNode{ + { + Node: n1, + BlockingPod: nil, + Reason: simulator.NotUnderutilized, + }, + }, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{"n1": { + Err: nil, + ResultType: status.NodeDeleteOk, + }}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + "No scaledown candidates, deletion failed": { + pods: []*apiv1.Pod{utilizedPod}, + nodes: []*apiv1.Node{n1}, + fakeDeletionResults: map[string]status.NodeDeleteResult{"n1": { + Err: nil, + ResultType: status.NodeDeleteErrorInternal, + }}, + fakeDeletionResultsNodeGroup: "ng1", + expectedStatus: &status.ScaleDownStatus{ + Result: status.ScaleDownNoUnneeded, + ScaledDownNodes: []*status.ScaleDownNode{}, + UnremovableNodes: []*status.UnremovableNode{ + { + Node: n1, + BlockingPod: nil, + Reason: simulator.NotUnderutilized, + }, + }, + RemovedNodeGroups: []cloudprovider.NodeGroup{}, + NodeDeleteResults: map[string]status.NodeDeleteResult{"n1": { + Err: nil, + ResultType: status.NodeDeleteErrorInternal, + }}, + NodeDeleteResultsAsOf: time.Time{}, + }, + }, + } + + for testName, test := range testCases { + // prevent issues with scoping, we should be able to get rid of that with Go 1.22 + test := test + t.Run(testName, func(t *testing.T) { + t.Parallel() + + mocks := newCommonMocks() + if test.fakeDeletionResults != nil { + tracker := deletiontracker.NewNodeDeletionTracker(time.Second * 0) + for node, result := range test.fakeDeletionResults { + tracker.StartDeletion(test.fakeDeletionResultsNodeGroup, node) + tracker.EndDeletion(test.fakeDeletionResultsNodeGroup, node, result) + } + + mocks.nodeDeletionTracker = tracker + } + setupConfig := &autoscalerSetupConfig{ + autoscalingOptions: options, + nodeGroups: []*nodeGroup{{ + name: "ng1", + min: 0, + max: 10, + nodes: test.nodes, + }}, + nodeStateUpdateTime: now, + mocks: mocks, + clusterStateConfig: clusterstate.ClusterStateRegistryConfig{ + OkTotalUnreadyCount: 1, + }, + } + autoscaler, err := setupAutoscaler(setupConfig) + assert.NoError(t, err) + + statusProcessor := &scaleDownStatusProcessorMock{} + autoscaler.processors.ScaleDownStatusProcessor = statusProcessor + + setupConfig.mocks.readyNodeLister.SetNodes(test.nodes) + setupConfig.mocks.allNodeLister.SetNodes(test.nodes) + setupConfig.mocks.allPodLister.On("List").Return(test.pods, nil) + setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil) + setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) + setupConfig.mocks.onScaleDown.On("ScaleDown", "ng1", "n2").Return(nil).Maybe() + + err = autoscaler.RunOnce(now.Add(time.Hour)) + assert.NoError(t, err) + + assert.Equal(t, statusProcessor.called, 1) + opts := cmp.Options{ + // These fields are not important for this check and may clutter the whole plot + cmpopts.IgnoreFields(status.UnremovableNode{}, "NodeGroup", "UtilInfo"), + cmpopts.IgnoreFields(status.ScaleDownNode{}, "NodeGroup", "UtilInfo"), + cmpopts.IgnoreFields(status.ScaleDownStatus{}, "NodeDeleteResultsAsOf"), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(test.expectedStatus, statusProcessor.scaleDownStatus, opts); diff != "" { + t.Errorf("ScaleDownStatusProcessor.Process(...): err diff (-want +got):\n%s", diff) + } + + mock.AssertExpectationsForObjects(t, + setupConfig.mocks.allPodLister, + setupConfig.mocks.podDisruptionBudgetLister, + setupConfig.mocks.daemonSetLister, + setupConfig.mocks.onScaleUp, + setupConfig.mocks.onScaleDown, + ) + }) + } + +} + func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { select { case <-deleteFinished: @@ -2405,7 +2656,7 @@ func waitForDeleteToFinish(t *testing.T, deleteFinished <-chan bool) { } } -func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry) (scaledown.Planner, scaledown.Actuator) { +func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_processors.AutoscalingProcessors, cs *clusterstate.ClusterStateRegistry, nodeDeletionTracker *deletiontracker.NodeDeletionTracker) (scaledown.Planner, scaledown.Actuator) { ctx.MaxScaleDownParallelism = 10 ctx.MaxDrainParallelism = 1 ctx.NodeDeletionBatcherInterval = 0 * time.Second @@ -2415,9 +2666,12 @@ func newScaleDownPlannerAndActuator(ctx *context.AutoscalingContext, p *ca_proce SkipNodesWithLocalStorage: true, SkipNodesWithCustomControllerPods: true, } - ndt := deletiontracker.NewNodeDeletionTracker(0 * time.Second) - sd := legacy.NewScaleDown(ctx, p, ndt, deleteOptions, nil) - actuator := actuation.NewActuator(ctx, cs, ndt, deleteOptions, nil, p.NodeGroupConfigProcessor) + + if nodeDeletionTracker == nil { + nodeDeletionTracker = deletiontracker.NewNodeDeletionTracker(0 * time.Second) + } + sd := legacy.NewScaleDown(ctx, p, nodeDeletionTracker, deleteOptions, nil) + actuator := actuation.NewActuator(ctx, cs, nodeDeletionTracker, deleteOptions, nil, p.NodeGroupConfigProcessor) wrapper := legacy.NewScaleDownWrapper(sd, actuator) return wrapper, wrapper }